diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index d43864c939cb63..8022e07b7ea332 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -201,6 +201,12 @@ def get_preparation_data(name): main_path = os.path.join(process.ORIGINAL_DIR, main_path) d['init_main_from_path'] = os.path.normpath(main_path) + # see gh-125828: workaround on MacOS to emulate `get_value` on Semaphore. + if sys.platform == 'darwin': + import _multiprocessing + d['_macosx_sharedmem_name'] = _multiprocessing._MACOSX_SHAREDMEM_NAME + d['_macosx_shmlock_name'] = _multiprocessing._MACOSX_SHMLOCK_NAME + return d # @@ -245,6 +251,18 @@ def prepare(data): elif 'init_main_from_path' in data: _fixup_main_from_path(data['init_main_from_path']) + # see gh-125828: workaround on MacOS to emulate `get_value` on Semaphore. + if sys.platform == 'darwin' and '_macosx_sharedmem_name' in data: + import _multiprocessing + _multiprocessing._set_shm_names( + data['_macosx_sharedmem_name'], + data['_macosx_shmlock_name'] + ) + # Update module attributes so grandchild processes also get + # the correct names when get_preparation_data() is called. + _multiprocessing._MACOSX_SHAREDMEM_NAME = data['_macosx_sharedmem_name'] + _multiprocessing._MACOSX_SHMLOCK_NAME = data['_macosx_shmlock_name'] + # Multiprocessing module helpers to fix up the main module in # spawned subprocesses def _fixup_main_from_name(mod_name): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 580d9f2b32544e..c4e7ca3e473b6c 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -138,8 +138,12 @@ def _resource_unlink(name, rtype): WAIT_ACTIVE_CHILDREN_TIMEOUT = 5.0 -HAVE_GETVALUE = not getattr(_multiprocessing, - 'HAVE_BROKEN_SEM_GETVALUE', False) +# Since gh-125828, we no longer need HAVE_GETVALUE. +# This value should be remove from Modules/_multiprocessing/multiprocessing.c. +# when cleanup is complete. +# ------------------- +# HAVE_GETVALUE = not getattr(_multiprocessing, +# 'HAVE_BROKEN_SEM_GETVALUE', False) WIN32 = (sys.platform == "win32") @@ -1750,10 +1754,8 @@ def test_semaphore(self): def test_bounded_semaphore(self): sem = self.BoundedSemaphore(2) self._test_semaphore(sem) - # Currently fails on OS/X - #if HAVE_GETVALUE: - # self.assertRaises(ValueError, sem.release) - # self.assertReturnsIfImplemented(2, get_value, sem) + self.assertRaises(ValueError, sem.release) + self.assertReturnsIfImplemented(2, get_value, sem) def test_timeout(self): if self.TYPE != 'processes': @@ -7380,6 +7382,63 @@ def test_preload_main_large_sys_argv(self): '', ]) +# +# Tests for workaround macOSX Semaphore +# + +ACQUIRE, RELEASE = range(2) +@unittest.skipIf(sys.platform != "darwin", "MacOSX only") +class _TestMacOSXSemaphore(BaseTestCase): + ALLOWED_TYPES = ('processes',) + @classmethod + def _run_thread(cls, sem, meth, ntime, delay): + if meth == ACQUIRE: + for _ in range(ntime): + sem.acquire() + time.sleep(delay) + else: + for _ in range(ntime): + sem.release() + time.sleep(delay) + + @classmethod + def _run_process(cls, sem, sem_meth, nthread=1, ntime=10, delay=0.1): + ts = [] + for _ in range(nthread): + t = threading.Thread(target=cls._run_thread, + args=(sem, sem_meth, ntime, delay)) + ts.append(t) + for t in ts: + t.start() + for t in ts: + t.join() + + def test_mix_several_acquire_release(self): + # n processes, threads per process and loops per threads + n_p_acq, n_th_acq, n_loop_acq = 15, 5, 20 + n_p_rel, n_th_rel, n_loop_rel = 8, 8, 8 + + n_acq = n_p_acq*n_th_acq*n_loop_acq + n_rel = n_p_rel*n_th_rel*n_loop_rel + sem = self.Semaphore(n_acq) + ps = [] + for _ in range(n_p_acq): + p = self.Process(target=self._run_process, + args=(sem, ACQUIRE, n_th_acq, n_loop_acq, 0.01)) + ps.append(p) + + for _ in range(n_p_rel): + p = self.Process(target=self._run_process, + args=(sem, RELEASE, n_th_rel, n_loop_rel, 0.005)) + ps.append(p) + + for p in ps: + p.start() + for p in ps: + p.join() + self.assertEqual(sem.get_value(), n_rel) + + # # Mixins # diff --git a/Misc/NEWS.d/next/macOS/2025-03-06-18-52-05.gh-issue-125828.JkMjD2.rst b/Misc/NEWS.d/next/macOS/2025-03-06-18-52-05.gh-issue-125828.JkMjD2.rst new file mode 100644 index 00000000000000..8a82d2a3072646 --- /dev/null +++ b/Misc/NEWS.d/next/macOS/2025-03-06-18-52-05.gh-issue-125828.JkMjD2.rst @@ -0,0 +1 @@ +Fix the not implemented ``get_value`` for :class:`multiprocessing.Semaphore` on MacOSX by adding a dedicated workaround in ``_multiprocessing.SemLock`` object. diff --git a/Modules/_multiprocessing/clinic/semaphore.c.h b/Modules/_multiprocessing/clinic/semaphore.c.h index 6b1c0092ce4816..e45036e2dcc809 100644 --- a/Modules/_multiprocessing/clinic/semaphore.c.h +++ b/Modules/_multiprocessing/clinic/semaphore.c.h @@ -401,44 +401,56 @@ _multiprocessing_SemLock__is_mine(PyObject *self, PyObject *Py_UNUSED(ignored)) #if defined(HAVE_MP_SEMAPHORE) -PyDoc_STRVAR(_multiprocessing_SemLock__get_value__doc__, -"_get_value($self, /)\n" +PyDoc_STRVAR(_multiprocessing_SemLock__is_zero__doc__, +"_is_zero($self, /)\n" "--\n" "\n" -"Get the value of the semaphore."); +"Return whether semaphore has value zero."); -#define _MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF \ - {"_get_value", (PyCFunction)_multiprocessing_SemLock__get_value, METH_NOARGS, _multiprocessing_SemLock__get_value__doc__}, +#define _MULTIPROCESSING_SEMLOCK__IS_ZERO_METHODDEF \ + {"_is_zero", (PyCFunction)_multiprocessing_SemLock__is_zero, METH_NOARGS, _multiprocessing_SemLock__is_zero__doc__}, static PyObject * -_multiprocessing_SemLock__get_value_impl(SemLockObject *self); +_multiprocessing_SemLock__is_zero_impl(SemLockObject *self); static PyObject * -_multiprocessing_SemLock__get_value(PyObject *self, PyObject *Py_UNUSED(ignored)) +_multiprocessing_SemLock__is_zero(PyObject *self, PyObject *Py_UNUSED(ignored)) { - return _multiprocessing_SemLock__get_value_impl((SemLockObject *)self); + PyObject *return_value = NULL; + + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _multiprocessing_SemLock__is_zero_impl((SemLockObject *)self); + Py_END_CRITICAL_SECTION(); + + return return_value; } #endif /* defined(HAVE_MP_SEMAPHORE) */ #if defined(HAVE_MP_SEMAPHORE) -PyDoc_STRVAR(_multiprocessing_SemLock__is_zero__doc__, -"_is_zero($self, /)\n" +PyDoc_STRVAR(_multiprocessing_SemLock__get_value__doc__, +"_get_value($self, /)\n" "--\n" "\n" -"Return whether semaphore has value zero."); +"Get the value of the semaphore."); -#define _MULTIPROCESSING_SEMLOCK__IS_ZERO_METHODDEF \ - {"_is_zero", (PyCFunction)_multiprocessing_SemLock__is_zero, METH_NOARGS, _multiprocessing_SemLock__is_zero__doc__}, +#define _MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF \ + {"_get_value", (PyCFunction)_multiprocessing_SemLock__get_value, METH_NOARGS, _multiprocessing_SemLock__get_value__doc__}, static PyObject * -_multiprocessing_SemLock__is_zero_impl(SemLockObject *self); +_multiprocessing_SemLock__get_value_impl(SemLockObject *self); static PyObject * -_multiprocessing_SemLock__is_zero(PyObject *self, PyObject *Py_UNUSED(ignored)) +_multiprocessing_SemLock__get_value(PyObject *self, PyObject *Py_UNUSED(ignored)) { - return _multiprocessing_SemLock__is_zero_impl((SemLockObject *)self); + PyObject *return_value = NULL; + + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _multiprocessing_SemLock__get_value_impl((SemLockObject *)self); + Py_END_CRITICAL_SECTION(); + + return return_value; } #endif /* defined(HAVE_MP_SEMAPHORE) */ @@ -563,14 +575,14 @@ _multiprocessing_SemLock___exit__(PyObject *self, PyObject *const *args, Py_ssiz #define _MULTIPROCESSING_SEMLOCK__IS_MINE_METHODDEF #endif /* !defined(_MULTIPROCESSING_SEMLOCK__IS_MINE_METHODDEF) */ -#ifndef _MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF - #define _MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF -#endif /* !defined(_MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF) */ - #ifndef _MULTIPROCESSING_SEMLOCK__IS_ZERO_METHODDEF #define _MULTIPROCESSING_SEMLOCK__IS_ZERO_METHODDEF #endif /* !defined(_MULTIPROCESSING_SEMLOCK__IS_ZERO_METHODDEF) */ +#ifndef _MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF + #define _MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF +#endif /* !defined(_MULTIPROCESSING_SEMLOCK__GET_VALUE_METHODDEF) */ + #ifndef _MULTIPROCESSING_SEMLOCK__AFTER_FORK_METHODDEF #define _MULTIPROCESSING_SEMLOCK__AFTER_FORK_METHODDEF #endif /* !defined(_MULTIPROCESSING_SEMLOCK__AFTER_FORK_METHODDEF) */ @@ -582,4 +594,4 @@ _multiprocessing_SemLock___exit__(PyObject *self, PyObject *const *args, Py_ssiz #ifndef _MULTIPROCESSING_SEMLOCK___EXIT___METHODDEF #define _MULTIPROCESSING_SEMLOCK___EXIT___METHODDEF #endif /* !defined(_MULTIPROCESSING_SEMLOCK___EXIT___METHODDEF) */ -/*[clinic end generated code: output=d1e349d4ee3d4bbf input=a9049054013a1b77]*/ +/*[clinic end generated code: output=d795474992886c07 input=a9049054013a1b77]*/ diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c index 201cedbb59818f..aa3c45b4e499aa 100644 --- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -182,6 +182,10 @@ static PyMethodDef module_methods[] = { #endif #if !defined(POSIX_SEMAPHORES_NOT_ENABLED) _MULTIPROCESSING_SEM_UNLINK_METHODDEF +#endif +#ifdef HAVE_BROKEN_SEM_GETVALUE + {"_set_shm_names", _multiprocessing_set_shm_names, METH_VARARGS, + "Set shared memory and glock names (used by spawned child processes)."}, #endif {NULL} }; @@ -230,6 +234,11 @@ multiprocessing_exec(PyObject *module) } Py_DECREF(py_sem_value_max); +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (_PyMp_init_shm_names(module) < 0) + return -1; +#endif + #endif /* Add configuration macros */ diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h index 099004b437828e..abcb5519b43ebc 100644 --- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -100,5 +100,9 @@ PyObject *_PyMp_SetError(PyObject *Type, int num); extern PyType_Spec _PyMp_SemLockType_spec; extern PyObject *_PyMp_sem_unlink(const char *name); +#ifdef HAVE_BROKEN_SEM_GETVALUE + extern int _PyMp_init_shm_names(PyObject *module); + extern PyObject *_multiprocessing_set_shm_names(PyObject *module, PyObject *args); +#endif #endif /* MULTIPROCESSING_H */ diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c index 85cc0ac70a6563..f7a47ab381aca6 100644 --- a/Modules/_multiprocessing/semaphore.c +++ b/Modules/_multiprocessing/semaphore.c @@ -19,17 +19,6 @@ // These match the values in Lib/multiprocessing/synchronize.py enum { RECURSIVE_MUTEX, SEMAPHORE }; -typedef struct { - PyObject_HEAD - SEM_HANDLE handle; - unsigned long last_tid; - int count; - int maxvalue; - int kind; - char *name; -} SemLockObject; - -#define _SemLockObject_CAST(op) ((SemLockObject *)(op)) /*[python input] class SEM_HANDLE_converter(CConverter): @@ -45,10 +34,25 @@ class _multiprocessing.SemLock "SemLockObject *" "&_PyMp_SemLockType" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=935fb41b7d032599]*/ +#ifndef HAVE_BROKEN_SEM_GETVALUE + +typedef struct { + PyObject_HEAD + SEM_HANDLE handle; + unsigned long last_tid; + int count; + int maxvalue; + int kind; + char *name; +} SemLockObject; + +#define _SemLockObject_CAST(op) ((SemLockObject *)(op)) + #include "clinic/semaphore.c.h" #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) +#endif /* !HAVE_BROKEN_SEM_GETVALUE */ #ifdef MS_WINDOWS @@ -56,6 +60,7 @@ class _multiprocessing.SemLock "SemLockObject *" "&_PyMp_SemLockType" * Windows definitions */ + #define SEM_FAILED NULL #define SEM_CLEAR_ERROR() SetLastError(0) @@ -253,7 +258,7 @@ sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) tvdeadline.tv_sec = deadline->tv_sec; tvdeadline.tv_usec = deadline->tv_nsec / 1000; - for (delay = 0 ; ; delay += 1000) { + for (delay = 0;; delay += 1000) { /* poll */ if (sem_trywait(sem) == 0) return 0; @@ -302,6 +307,590 @@ sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) #endif /* !HAVE_SEM_TIMEDWAIT */ +#ifdef HAVE_BROKEN_SEM_GETVALUE +/* + +cf: https://github.com/python/cpython/issues/125828 + +On MacOSX, `sem_getvalue` is not implemented. This workaround proposes +to handle an internal value for each Semaphore ([R]Lock are out of scope) +in a shared memory available for each processed. The shared memory is created +with a dedicated lock to protect RW operations. Their 2 names are unique and based +on each main process id. + +This internal values are stored in a structure named CounterObject with: ++ the referenced semaphore name, ++ the current value, ++ the number of pending acquires, ++ the unlink flag (case of "fork" start method), ++ a created timestamp (PyDEBUG mode). + +A header with 3 members is created to manage all the CounterObject in the shared memory. ++ the count of CountedObject stored, ++ the count of available slots, ++ the size of shared memory. + +For each Semaphore (SemLock python class) a mutex is created in order to avoid data races +when the internal counters of CounterObject is updated or read. + ++ When impl/rebuid functions are called, a CounterObject and a mutex are associated + to the Semaphore, ++ When acquire/release functions are called, internal values + -internal_value and pending_axcquires - are updated. ++ When get_value function is called, the difference of internal values is returned. ++ When unlink is called, Semaphore and its associated mutex are unlink. + +When a semaphore is deleted, the associated CounterObject is reset. If no more semaphore +exists, the shared memory and associated lock are destroyed (close and unlink for each). + +Each new process, initializes a process structure. +When a new sempahore is created, we check if the dedicated semaphore exists. +If not, this one is created, then the shared memory. +When a semaphore is rebuilt, normaly the semaphore already exists, and so the dedicated semaphore +and the memory too. +When a process is destroyed, on MacOSX we are sure that all resources are automaticly closed +including thsees two items. + +1 -> Structure of shared memory: + + --------------- fixed array of 'n' Counters --------------- + / \ ++-----------------+----------------------+---/ /---+-------------+-------------+ +| Header | Counter 1 | | Counter N-1 | Counter N | +|-----------------|----------------------| .... |-------------|-------------| +| | | | | | +| n_semlocks | sem_name | | | | +| n_slots | internal_value | | | | +| size_shm | pending_acquires | | | | +| | unlink | | | | +| | ctimestamp(PyDEBUG) | | | | ++-----------------+----------------------+---/ /--+-------------+-------------+ + +The main lock is used to control all shared memory operations as: ++ create, connect to shared mem, remove it, ++ create and rebuild SemlockObject, ++ operations on counter datas as: + + looking for a free slot, + + initialize counter data in a free slot, + + looking for an existing counter data, + + erase a counter data. + +*/ + +#include "semaphore_macosx.h" // CounterObject, HeaderObject, CountersWorkaround + +/* +Datas for each process. +*/ + +static struct _CountersWorkaround +shm_semlock_counters = { + .shm_name = {0}, + .handle_shm = (MEMORY_HANDLE)0, + .shmlock_name = {0}, + .handle_shmlock = (SEM_HANDLE)0, + .header = (HeaderObject *)NULL, + .counters = (CounterObject *)NULL +}; + +/* +SemLockObject with aditionnal members: ++ a mutex to safely handle the associated CounterObject. ++ a pointer to CounterObject (from shared memory array). +*/ +typedef struct { + PyObject_HEAD + SEM_HANDLE handle; + unsigned long last_tid; + int count; + int maxvalue; + int kind; + char *name; + /* Additionnal datas for handle MacOSX semaphore */ + SEM_HANDLE handle_mutex; + CounterObject *counter; +} SemLockObject; + +#define _SemLockObject_CAST(op) ((SemLockObject *)(op)) + +#define ISMINE(o) ((o)->count > 0 && PyThread_get_thread_ident() == (o)->last_tid) + +#define ISSEMAPHORE(o) ((o)->maxvalue > 1 && (o)->kind == SEMAPHORE) +#define ISSEMAPHORE_FROM_ARGS(m, k) ((m) > 1 && (k) == SEMAPHORE) + +#include "clinic/semaphore.c.h" + +/* +Build name_shm and name_shmlock from the main process PID. +Called once at module initialization. +*/ +static void +_init_shm_names(pid_t pid) +{ + PyOS_snprintf(shm_semlock_counters.shm_name, + sizeof(shm_semlock_counters.shm_name), + "%s%ld", SHAREDMEM_NAME, (long)pid); + PyOS_snprintf(shm_semlock_counters.shmlock_name, + sizeof(shm_semlock_counters.shmlock_name), + "%s%ld", SHMLOCK_NAME, (long)pid); +} + +/* +Non-static entry point called from multiprocessing_exec(). +Builds names from getpid() and exposes them as module attributes. +*/ +int +_PyMp_init_shm_names(PyObject *module) +{ + _init_shm_names(getpid()); + if (PyModule_AddStringConstant(module, "_MACOSX_SHAREDMEM_NAME", + shm_semlock_counters.shm_name) < 0) + return -1; + if (PyModule_AddStringConstant(module, "_MACOSX_SHMLOCK_NAME", + shm_semlock_counters.shmlock_name) < 0) + return -1; + return 0; +} + +/* +Python-callable: child processes (in spawn.py) call this to get the +names built by the main process instead of generating their own. +*/ +PyObject * +_multiprocessing_set_shm_names(PyObject *module, PyObject *args) +{ + const char *shm_name, *gshmlock_name; + if (!PyArg_ParseTuple(args, "ss", &shm_name, &gshmlock_name)) + return NULL; + strncpy(shm_semlock_counters.shm_name, + shm_name, sizeof(shm_semlock_counters.shm_name) - 1); + shm_semlock_counters.shm_name[sizeof(shm_semlock_counters.shm_name) - 1] = '\0'; + strncpy(shm_semlock_counters.shmlock_name, + gshmlock_name, sizeof(shm_semlock_counters.shmlock_name) - 1); + shm_semlock_counters.shmlock_name[sizeof(shm_semlock_counters.shmlock_name) - 1] = '\0'; + Py_RETURN_NONE; +} + +/* +Lock functions +*/ +static int +release_lock(SEM_HANDLE handle) +{ + int res = -1 ; + + errno = 0; + res = sem_post(handle); + if (res < 0) { + PyErr_SetFromErrno(PyExc_OSError); + } + return res; +} + +static int +exist_lock(SEM_HANDLE handle) +{ + int res = -1; + int err = 0; + + if (handle == NULL || handle == SEM_FAILED) { + return 0; + } + + errno = 0; + do { + res = sem_trywait(handle); + err = errno; + } while (res < 0 && (errno == EINTR && !PyErr_CheckSignals())); + + if (res < 0 && (errno == EBADF)) { + return 0; + } + + errno = err; + if (res < 0 && errno == EAGAIN) { + // Couldn't acquire immediately, need to block + do { + Py_BEGIN_ALLOW_THREADS + res = sem_trywait(handle); + Py_END_ALLOW_THREADS + err = errno; + if (res == MP_EXCEPTION_HAS_BEEN_SET) + break; + } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); + } + + if (res == 0) { + if(sem_post(handle) < 0) { + PyErr_SetFromErrno(PyExc_OSError); + return 0; + } + return 1; + } + + return res < 0 && err == EAGAIN ? 1 : 0; +} + +/* +See model from _multiprocessing_SemLock_acquire_impl function. +*/ +static int +acquire_lock(SEM_HANDLE handle) +{ + int res = -1; + int err = 0 ; + + /* Check whether we can acquire without releasing the GIL and blocking */ + errno = 0; + do { + res = sem_trywait(handle); + err = errno; + } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); + + errno = err; + if (res < 0 && errno == EAGAIN) { + /* Couldn't acquire immediately, need to block */ + do { + Py_BEGIN_ALLOW_THREADS + res = sem_wait(handle); + Py_END_ALLOW_THREADS + err = errno; + if (res == MP_EXCEPTION_HAS_BEEN_SET) + break; + } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); + } + + if (res < 0) { + errno = err; + PyErr_SetFromErrno(PyExc_OSError); + return -1; + } + return res; +} + +/* +Global Lock functions as: +- create the lock. +- connect to the existing lock and lock it. +- delete the lock. +*/ +static int +create_shmlock_and_lock(const char *sem_name) +{ + SEM_HANDLE sem = SEM_FAILED; + + errno = 0; + // create and lock the semaphore, via initial value set 0) + sem = SEM_CREATE(sem_name, 0, 1); + if (sem == SEM_FAILED) { + return -1; + } + // Created successfully and already acquire. + shm_semlock_counters.handle_shmlock = sem; + return 0; +} + +static int +connect_shmlock_and_lock(const char *sem_name) +{ + SEM_HANDLE sem = SEM_FAILED; + + errno = 0; + sem = sem_open(sem_name, 0); + if (sem == SEM_FAILED) { + return -1; + } + shm_semlock_counters.handle_shmlock = sem; + if (!ACQUIRE_SHMLOCK) { + return -1; + } + return 0; +} + +static int +delete_shmlock(void) { + int res = -1; + + if (shm_semlock_counters.handle_shmlock != SEM_FAILED) { + res = SEM_CLOSE(shm_semlock_counters.handle_shmlock); + shm_semlock_counters.handle_shmlock = SEM_FAILED; + SEM_UNLINK(shm_semlock_counters.shmlock_name); + } + return res; +} + +/* +Shared memory management +*/ +#ifdef HAVE_SYS_MMAN_H +# include // shm_open(), shm_unlink() +#endif + +// Copy from posixshmem.c file +static int +posixshmem_shm_open(char *name, int flags, int mode) +{ + int async_err = 0; + int fd; + do { + Py_BEGIN_ALLOW_THREADS + fd = shm_open(name, flags, mode); + Py_END_ALLOW_THREADS + } while (fd < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals())); + + if (fd < 0) { + return -1; + } + return fd; +} + +/* +shared memory functions +*/ +static int +delete_shm_semlock_counters(int lock_shm) { + int n_opened_sems = -1; + + if (shm_semlock_counters.handle_shmlock != SEM_FAILED) { + //close(shm_semlock_counters.handle_shm); + n_opened_sems = shm_semlock_counters.header->n_semlocks; + if (n_opened_sems == 0) { + munmap(shm_semlock_counters.header, + shm_semlock_counters.header->size_shm); + shm_semlock_counters.header = NULL; + shm_semlock_counters.counters = NULL; + + if (shm_semlock_counters.handle_shm > 0) { + close(shm_semlock_counters.handle_shm); + shm_semlock_counters.handle_shm = (MEMORY_HANDLE)0; + } + shm_unlink(shm_semlock_counters.shm_name); + } + } + return n_opened_sems; +} + +/* +Used only in SemLock_impl +*/ +static int +create_shm_semlock_counters(const char *from_sem_name) { + int oflag = O_RDWR | O_CREAT; + int mode = S_IRUSR | S_IWUSR; + int shm = -1; + int res = -1; + char *datas = NULL; + HeaderObject *header = NULL; + long size_shm = ALIGN_SHM_PAGE(CALC_SIZE_SHM); + + // Link to semaphore and lock immediatly. + if (create_shmlock_and_lock(shm_semlock_counters.shmlock_name) < 0) { + return -1; + } + + // The lock is already acquire, create the shared memory + shm = posixshmem_shm_open(shm_semlock_counters.shm_name, oflag, mode); + if (shm == -1) { + shm_semlock_counters.handle_shm = (MEMORY_HANDLE)0; + return -1; + } + + res = ftruncate(shm, size_shm); + if (res < 0) { + close(shm_semlock_counters.handle_shm); + return -1; + } + + shm_semlock_counters.handle_shm = shm; + datas = (char *)mmap(NULL, + size_shm, + (PROT_WRITE | PROT_READ), + MAP_SHARED, + shm_semlock_counters.handle_shm, + 0L); + if (datas != MAP_FAILED) { + // Set header + shm_semlock_counters.header = (HeaderObject *)datas; + + // Set first slot of array + shm_semlock_counters.counters = (CounterObject *)(datas + sizeof(HeaderObject)); + header = shm_semlock_counters.header; + + header->size_shm = size_shm; + header->n_slots = CALC_NB_SLOTS(size_shm); + header->n_semlocks = 0; + res = 0; + } else { + // error mmap + close(shm_semlock_counters.handle_shm); + shm_semlock_counters.handle_shm = (MEMORY_HANDLE)0; + res = -1; + } + return res; +} + +/* +Used only in SemLock__rebuild_impl +*/ +static int +connect_shm_semlock_counters(const char *from_sem_name) +{ + int oflag = O_RDWR; + int mode = S_IRUSR | S_IWUSR; + int shm = -1; + int res = -1; + char *datas = NULL; + long size_shm = ALIGN_SHM_PAGE(CALC_SIZE_SHM); + + // Link to semaphore and lock immediatly. + errno = 0; + if (connect_shmlock_and_lock(shm_semlock_counters.shmlock_name) < 0) { + return -1; + } + + // The lock is now acquired, connect to the shared memory. + shm = posixshmem_shm_open(shm_semlock_counters.shm_name, oflag, mode); + if (shm == -1) { + shm_semlock_counters.handle_shm = (MEMORY_HANDLE)0; + return -1; + } + + shm_semlock_counters.handle_shm = shm; + datas = (char *)mmap(NULL, + size_shm, + (PROT_WRITE | PROT_READ), + (MAP_SHARED), + shm_semlock_counters.handle_shm, + 0L); + if (datas != MAP_FAILED) { + // Header + shm_semlock_counters.header = (HeaderObject *)datas; + // First slot of array + shm_semlock_counters.counters = (CounterObject *)(datas + sizeof(HeaderObject)); + res = 0; + } else { + // error mmap + close(shm_semlock_counters.handle_shm); + shm_semlock_counters.handle_shm = (MEMORY_HANDLE)0; + res = -1; + } + return res; +} + +/* +Build name of mutex, thus associated with each Semaphore. +Name is unique and create from SemLock python class. +*/ +static char *gh_name = "-gh125828"; + +static char * +build_mutex_name(char *buf, int size, const char *name) +{ + PyOS_snprintf(buf, size, "%s%s", name, gh_name); + return buf; +} + +/* +Search if the semaphore name is already stored in the array of CounterObject. +*/ +static CounterObject* +_search_counter_from_sem_name(const char *sem_name) +{ + int i = 0, j = 0; + HeaderObject *header = shm_semlock_counters.header; + CounterObject *counter = shm_semlock_counters.counters; + + for( ; i < header->n_slots && j < header->n_semlocks; i++, counter++) { + if (counter->sem_name[0] != 0) { + if (!PyOS_stricmp(counter->sem_name, sem_name)) { + return counter; + } + ++j; + } + } + return NULL; +} + +/* +Search for a free slot from the array of CounterObject. +*/ +static CounterObject* +_search_counter_free_slot(void) +{ + int i = 0; + HeaderObject *header = shm_semlock_counters.header; + CounterObject *counter = shm_semlock_counters.counters; + + for( ; i < header->n_slots ; i++, counter++) { + if(counter->sem_name[0] == 0) { + return counter; + } + } + + /* + Not enough memory: see NSEMS_MAX in semaphore_macosx.h. + */ + return NULL; +} + +/* +Connect a Semaphore with an existing CounterObject, from `SemLock__rebuild`. +*/ +static CounterObject * +connect_counter(SemLockObject *self) +{ + CounterObject *counter = _search_counter_from_sem_name(self->name); + if (!counter) { + PyErr_Format(PyExc_ValueError, "Can't find reference to this " + "semaphore: %s", self->name); + } + return counter; +} + +/* +Create a new CounterObject for a Semaphore, from `SemLock_Impl`. +*/ +static CounterObject * +new_counter(SemLockObject *self, const char *name, int value, int unlink) +{ + CounterObject *counter = _search_counter_free_slot(); + if (counter) { + strncpy(counter->sem_name, name, SIZE_SEM_NAME - 1); + counter->sem_name[SIZE_SEM_NAME - 1] = '\0'; + counter->internal_value = value; + counter->unlink = unlink; +#ifdef PyDEBUG + counter->ctimestamp = time(NULL); +#endif + // Update header. + ++shm_semlock_counters.header->n_semlocks; + } else { + PyErr_SetString(PyExc_MemoryError, "Can't allocate more " + "shared memory for MacOSX " + "Semaphore workaround"); + } + return counter; +} + +/* +Remove CounterObject from shared mem. +*/ + +static int +remove_counter(CounterObject *counter) +{ + int n_opened_sems = -1; + + memset(counter, 0, sizeof(CounterObject)); + --shm_semlock_counters.header->n_semlocks; + if (shm_semlock_counters.header->n_semlocks == 0) { + n_opened_sems = delete_shm_semlock_counters(false); + } + return n_opened_sems; +} + +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + /*[clinic input] @critical_section _multiprocessing.SemLock.acquire @@ -319,12 +908,10 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking, { int res, err = 0; struct timespec deadline = {0}; - if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) { ++self->count; Py_RETURN_TRUE; } - int use_deadline = (timeout_obj != Py_None); if (use_deadline) { double timeout = PyFloat_AsDouble(timeout_obj); @@ -348,6 +935,17 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking, deadline.tv_nsec %= 1000000000; } +#ifdef HAVE_BROKEN_SEM_GETVALUE + /* + Acquire the counter mutex before sem_trywait so + that the trywait and the internal_value decrement are atomic + with respect to release_impl. + */ + if (ISSEMAPHORE(self) && !ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } +#endif + /* Check whether we can acquire without releasing the GIL and blocking */ do { res = sem_trywait(self->handle); @@ -355,8 +953,45 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking, } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); errno = err; +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + if (res >= 0) { + --self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } else { + /* + sem_trywait failed (res < 0), we must NOT hold this mutex + while blocking on the semaphore + because release_impl holds it while calling sem_post + — keeping it here would deadlock + */ + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } + // errno could be modified via sem_post + errno = err; + } + #endif + if (res < 0 && errno == EAGAIN && blocking) { - /* Couldn't acquire immediately, need to block */ + /* Couldn't acquire immediately, need to block. */ +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + /* Signal that we are entering the blocking wait so that + release_impl uses (internal_value - pending_acquires) + as the effective semaphore value. */ + if (!ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + ++self->counter->pending_acquires; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } +#endif do { Py_BEGIN_ALLOW_THREADS if (!use_deadline) { @@ -370,24 +1005,39 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking, if (res == MP_EXCEPTION_HAS_BEEN_SET) break; } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); - } +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + --self->counter->pending_acquires; + if (res >= 0) + --self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } else { + return NULL; + } + } +#endif + } if (res < 0) { errno = err; - if (errno == EAGAIN || errno == ETIMEDOUT) + if (errno == EAGAIN || errno == ETIMEDOUT) { Py_RETURN_FALSE; - else if (errno == EINTR) + } + if (errno == EINTR) { return NULL; - else - return PyErr_SetFromErrno(PyExc_OSError); + } + return PyErr_SetFromErrno(PyExc_OSError); } ++self->count; self->last_tid = PyThread_get_thread_ident(); - Py_RETURN_TRUE; } + /*[clinic input] @critical_section _multiprocessing.SemLock.release @@ -402,8 +1052,8 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self) if (self->kind == RECURSIVE_MUTEX) { if (!ISMINE(self)) { PyErr_SetString(PyExc_AssertionError, "attempt to " - "release recursive lock not owned " - "by thread"); + "release recursive lock " + "not owned by thread"); return NULL; } if (self->count > 1) { @@ -433,12 +1083,28 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self) "times"); return NULL; } + } else { + int sval = -1; + if (ISSEMAPHORE(self)) { + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + sval = self->counter->internal_value + - (int)self->counter->pending_acquires; + } else { + return NULL; + } + + if (sval >= self->maxvalue) { + RELEASE_COUNTER_MUTEX(self->handle_mutex); + PyErr_SetString(PyExc_ValueError, "semaphore or lock " + "released too many times"); + return NULL; + } + } } -#else +#else /* HAVE_BROKEN_SEM_GETVALUE */ int sval; - /* This check is not an absolute guarantee that the semaphore - does not rise above maxvalue. */ + does not rise above maxvalue. */ if (sem_getvalue(self->handle, &sval) < 0) { return PyErr_SetFromErrno(PyExc_OSError); } else if (sval >= self->maxvalue) { @@ -452,6 +1118,15 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self) if (sem_post(self->handle) < 0) return PyErr_SetFromErrno(PyExc_OSError); +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + ++self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } +#endif + --self->count; Py_RETURN_NONE; } @@ -475,6 +1150,11 @@ newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue, self->last_tid = 0; self->maxvalue = maxvalue; self->name = name; + +#ifdef HAVE_BROKEN_SEM_GETVALUE + self->handle_mutex = SEM_FAILED; + self->counter = NULL; +#endif return (PyObject*)self; } @@ -499,6 +1179,13 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, PyObject *result; char *name_copy = NULL; +#ifdef HAVE_BROKEN_SEM_GETVALUE + char mutex_name[SIZE_MUTEX_NAME]; + SemLockObject *semlock = NULL; + SEM_HANDLE handle_mutex = SEM_FAILED; + CounterObject *counter = NULL; +#endif + if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { PyErr_SetString(PyExc_ValueError, "unrecognized kind"); return NULL; @@ -512,12 +1199,26 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, strcpy(name_copy, name); } +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE_FROM_ARGS(maxvalue, kind)) { + if (!EXIST_SHMLOCK) { + if (create_shm_semlock_counters(name) < 0) { + RELEASE_SHMLOCK; + goto failure; + } + } else { + if (!ACQUIRE_SHMLOCK) { + goto failure; + } + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE*/ + SEM_CLEAR_ERROR(); handle = SEM_CREATE(name, value, maxvalue); /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */ if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0) goto failure; - if (unlink && SEM_UNLINK(name) < 0) goto failure; @@ -525,6 +1226,36 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, if (!result) goto failure; +#ifdef HAVE_BROKEN_SEM_GETVALUE + semlock = _SemLockObject_CAST(result); + if (ISSEMAPHORE(semlock)) { + handle_mutex = SEM_CREATE(build_mutex_name(mutex_name, SIZE_MUTEX_NAME, name), 1, 1); + if (handle_mutex != SEM_FAILED) { + // Counter must exist + counter = new_counter(semlock, name, value, unlink); + if(counter) { + semlock->handle_mutex = handle_mutex; + semlock->counter = counter; + // unlink + if (unlink && SEM_UNLINK(mutex_name) < 0) + goto failure; + } + } + if (!RELEASE_SHMLOCK) { + goto failure; + } + + if (!counter) { + PyObject_GC_UnTrack(semlock); + type->tp_free(semlock); + Py_DECREF(type); + goto failure; + } + if (handle_mutex == SEM_FAILED) + goto failure; + } +#endif /* HAVE_BROKEN_SEM_GETVALUE*/ + return result; failure: @@ -533,6 +1264,26 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, } if (handle != SEM_FAILED) SEM_CLOSE(handle); + +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(semlock)) { + int n_opened_sems = -1; + + if (handle_mutex != SEM_FAILED) { + SEM_CLOSE(handle_mutex); + } + if (counter) { + if(ACQUIRE_SHMLOCK) { + n_opened_sems = remove_counter(counter); + RELEASE_SHMLOCK; + } + if (!n_opened_sems) { + (void)delete_shmlock(); + } + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE*/ + PyMem_Free(name_copy); return NULL; } @@ -555,14 +1306,36 @@ _multiprocessing_SemLock__rebuild_impl(PyTypeObject *type, SEM_HANDLE handle, const char *name) /*[clinic end generated code: output=2aaee14f063f3bd9 input=f7040492ac6d9962]*/ { + PyObject *result = NULL; char *name_copy = NULL; +#ifdef HAVE_BROKEN_SEM_GETVALUE + char mutex_name[SIZE_MUTEX_NAME]; + SemLockObject *semlock = NULL; + SEM_HANDLE handle_mutex = SEM_FAILED; + CounterObject *counter = NULL; +#endif + if (name != NULL) { name_copy = PyMem_Malloc(strlen(name) + 1); if (name_copy == NULL) return PyErr_NoMemory(); strcpy(name_copy, name); } +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE_FROM_ARGS(maxvalue, kind)) { + if (!EXIST_SHMLOCK) { + if (connect_shm_semlock_counters(name) < 0) { + RELEASE_SHMLOCK; + goto failure; + } + } else { + if (!ACQUIRE_SHMLOCK) { + goto failure; + } + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE*/ #ifndef MS_WINDOWS if (name != NULL) { @@ -573,20 +1346,91 @@ _multiprocessing_SemLock__rebuild_impl(PyTypeObject *type, SEM_HANDLE handle, return NULL; } } -#endif +#endif /* !MS_WINDOWS */ + + result = newsemlockobject(type, handle, kind, maxvalue, name_copy); + +#ifdef HAVE_BROKEN_SEM_GETVALUE + semlock = _SemLockObject_CAST(result); + if (ISSEMAPHORE(semlock)) { + handle_mutex = sem_open(build_mutex_name(mutex_name, SIZE_MUTEX_NAME, name), 0); + if (handle_mutex != SEM_FAILED) { + counter = connect_counter(semlock); + if (counter) { + semlock->counter = counter; + semlock->handle_mutex = handle_mutex; + } + } + if(!RELEASE_SHMLOCK) { + goto failure; + } + PyErr_Clear(); + if (counter && handle_mutex != SEM_FAILED) { + return result; + } + + failure: + if (handle_mutex == SEM_FAILED) { + if (handle != SEM_FAILED) { + SEM_CLOSE(handle); + } + PyErr_SetFromErrno(PyExc_OSError); + } + + if (semlock->handle != SEM_FAILED) { + SEM_CLOSE(semlock->handle); + } + if (semlock->handle_mutex != SEM_FAILED) { + SEM_CLOSE(semlock->handle_mutex); - return newsemlockobject(type, handle, kind, maxvalue, name_copy); + } + PyObject_GC_UnTrack(semlock); + type->tp_free(semlock); + Py_DECREF(type); + if (!PyErr_Occurred()) { + PyErr_SetFromErrno(PyExc_OSError); + } + PyMem_Free(name_copy); + return NULL; + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + + return result; } static void semlock_dealloc(PyObject *op) { - SemLockObject *self = _SemLockObject_CAST(op); + SemLockObject* self = _SemLockObject_CAST(op); PyTypeObject *tp = Py_TYPE(self); PyObject_GC_UnTrack(self); - if (self->handle != SEM_FAILED) + if (self->handle != SEM_FAILED) { SEM_CLOSE(self->handle); - PyMem_Free(self->name); + } + +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + int n_opened_sems = -1; + if (self->handle_mutex != SEM_FAILED) { + SEM_CLOSE(self->handle_mutex); + } + if (EXIST_SHMLOCK && ACQUIRE_SHMLOCK) { + if (self->counter->unlink == 1) { + // case of 'fork' start method. + n_opened_sems = remove_counter(self->counter); + } + RELEASE_SHMLOCK; + if (!n_opened_sems) { + (void)delete_shmlock(); + } + } + self->counter = NULL; + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + + if (self->name) { + PyMem_Free(self->name); + } tp->tp_free(self); Py_DECREF(tp); } @@ -620,55 +1464,82 @@ _multiprocessing_SemLock__is_mine_impl(SemLockObject *self) } /*[clinic input] -_multiprocessing.SemLock._get_value +@critical_section +_multiprocessing.SemLock._is_zero -Get the value of the semaphore. +Return whether semaphore has value zero. [clinic start generated code]*/ static PyObject * -_multiprocessing_SemLock__get_value_impl(SemLockObject *self) -/*[clinic end generated code: output=64bc1b89bda05e36 input=cb10f9a769836203]*/ +_multiprocessing_SemLock__is_zero_impl(SemLockObject *self) +/*[clinic end generated code: output=815d4c878c806ed7 input=7401329b1f0f059c]*/ { #ifdef HAVE_BROKEN_SEM_GETVALUE - PyErr_SetNone(PyExc_NotImplementedError); - return NULL; + if (sem_trywait(self->handle) < 0) { + if (errno == EAGAIN) + Py_RETURN_TRUE; + return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + } else { + if (sem_post(self->handle) < 0) + return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + Py_RETURN_FALSE; + } #else int sval; if (SEM_GETVALUE(self->handle, &sval) < 0) return _PyMp_SetError(NULL, MP_STANDARD_ERROR); - /* some posix implementations use negative numbers to indicate - the number of waiting threads */ - if (sval < 0) - sval = 0; - return PyLong_FromLong((long)sval); + return PyBool_FromLong((long)sval == 0); #endif } /*[clinic input] -_multiprocessing.SemLock._is_zero +@critical_section +_multiprocessing.SemLock._get_value -Return whether semaphore has value zero. +Get the value of the semaphore. [clinic start generated code]*/ static PyObject * -_multiprocessing_SemLock__is_zero_impl(SemLockObject *self) -/*[clinic end generated code: output=815d4c878c806ed7 input=294a446418d31347]*/ +_multiprocessing_SemLock__get_value_impl(SemLockObject *self) +/*[clinic end generated code: output=64bc1b89bda05e36 input=b900f9766c864d35]*/ { + int sval = -1; + #ifdef HAVE_BROKEN_SEM_GETVALUE - if (sem_trywait(self->handle) < 0) { - if (errno == EAGAIN) - Py_RETURN_TRUE; - return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + if (self->maxvalue <= 1) { + PyObject *is_zero = _multiprocessing_SemLock__is_zero_impl(self); + if (is_zero == NULL) { + return NULL; + } + long val = (long)Py_IsFalse(is_zero); + Py_DECREF(is_zero); + return PyLong_FromLong(val); + } + + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + sval = self->counter->internal_value + - (int)self->counter->pending_acquires; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } } else { - if (sem_post(self->handle) < 0) - return _PyMp_SetError(NULL, MP_STANDARD_ERROR); - Py_RETURN_FALSE; + return NULL; } + if (sval < 0) { + sval = 0; + } + return PyLong_FromLong((long)sval); #else - int sval; - if (SEM_GETVALUE(self->handle, &sval) < 0) + if (SEM_GETVALUE(self->handle, &sval) < 0) { return _PyMp_SetError(NULL, MP_STANDARD_ERROR); - return PyBool_FromLong((long)sval == 0); + } + /* some posix implementations use negative numbers to indicate + the number of waiting threads */ + if (sval < 0) { + sval = 0; + } + return PyLong_FromLong((long)sval); #endif } @@ -785,7 +1656,7 @@ PyType_Spec _PyMp_SemLockType_spec = { * Function to unlink semaphore names */ -PyObject * + PyObject * _PyMp_sem_unlink(const char *name) { if (SEM_UNLINK(name) < 0) { @@ -793,6 +1664,31 @@ _PyMp_sem_unlink(const char *name) return NULL; } +#ifdef HAVE_BROKEN_SEM_GETVALUE + char mutex_name[SIZE_MUTEX_NAME]; + CounterObject *counter = NULL; + int n_opened_sems = -1; + + if (SEM_UNLINK(build_mutex_name(mutex_name, SIZE_MUTEX_NAME, name)) == 0) { + /* + Here we could remove each associated semphore counter when + multiprocessing start methods are 'spawn' or 'forkserver' + - unlink flag is always set to 0. + In "fork" start method, this function is never called, + */ + if (EXIST_SHMLOCK && ACQUIRE_SHMLOCK) { + counter = _search_counter_from_sem_name(name); + if (counter) { + n_opened_sems = remove_counter(counter); + } + RELEASE_SHMLOCK; + if (!n_opened_sems) { + (void)delete_shmlock(); + } + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + Py_RETURN_NONE; } diff --git a/Modules/_multiprocessing/semaphore_macosx.h b/Modules/_multiprocessing/semaphore_macosx.h new file mode 100644 index 00000000000000..a93381677143a9 --- /dev/null +++ b/Modules/_multiprocessing/semaphore_macosx.h @@ -0,0 +1,71 @@ +#ifndef SEMAPHORE_MACOSX_H +#define SEMAPHORE_MACOSX_H + +#include // sysconf(SC_PAGESIZE) +#include // shm_open, shm_unlink + +/* +Structures and constants in shared memory +*/ + +#define SIZE_SEM_NAME 16 +#define SIZE_MUTEX_NAME (SIZE_SEM_NAME<<1) /* "/mp-xxxxxxxx" (12) + "-gh125828" */ + +typedef struct { + int n_semlocks; // Current number of semaphores. Starts 0. + int n_slots; // Current slots in the counter array. + int size_shm; // Size of allocated shared memory (this and N counters). +} HeaderObject; + +typedef struct { + char sem_name[SIZE_SEM_NAME]; // Name of semaphore. + short internal_value; // Internal value of semaphore, update on each acquire/release. + short pending_acquires; // Threads in sem_wait not yet decremented internal_value. + short unlink; // Indicate if this semaphore is alreday unlink. +#ifdef PyDEBUG + time_t ctimestamp; // Created timestamp (debug log). +#endif +} CounterObject; + +/* +Structure, constants and macros of static memory: +*/ + +#define NSEMS_MAX sysconf(_SC_SEM_NSEMS_MAX) // returns 87381 on MacOSX 15.1 and m4 pro. +#define CALC_SIZE_SHM (NSEMS_MAX * sizeof(CounterObject)) + sizeof(HeaderObject) +#define SC_PAGESIZE sysconf(_SC_PAGESIZE) +#define ALIGN_SHM_PAGE(n) ((int)((n)/SC_PAGESIZE)+1)*SC_PAGESIZE + +#define CALC_NB_SLOTS(n) (int)((((n)) - sizeof(HeaderObject)) / sizeof(CounterObject)) + +#define SHAREDMEM_NAME "/psm-gh125828-" +#define SHMLOCK_NAME "/mp-gh125828-" + +#define SIZE_SHM_NAME (SIZE_SEM_NAME<<1) /* "/psm-gh125828-" (14) + 6 digits PID + '\0' */ +#define SIZE_SHMLOCK_NAME (SIZE_SEM_NAME<<1) /* "/mp-gh125828-" (13) + 6 digits PID + '\0' */ + +typedef int MEMORY_HANDLE; + +struct _CountersWorkaround{ + /*-- global datas --*/ + char shm_name[SIZE_SHM_NAME]; + MEMORY_HANDLE handle_shm; // Memory handle. + char shmlock_name[SIZE_SHMLOCK_NAME]; + SEM_HANDLE handle_shmlock; // Global memory lock to handle shared memory. + /*-- Pointers to shared memory --*/ + HeaderObject *header; // Pointer to header (shared memory). + CounterObject*counters; // Pointer to the first item of fixed array (shared memory). +}; + +/* +lock and mutexes macros +*/ + +#define EXIST_SHMLOCK (exist_lock(shm_semlock_counters.handle_shmlock) == 1) +#define ACQUIRE_SHMLOCK (acquire_lock(shm_semlock_counters.handle_shmlock) == 0) +#define RELEASE_SHMLOCK (release_lock(shm_semlock_counters.handle_shmlock) == 0) + +#define ACQUIRE_COUNTER_MUTEX(h) (acquire_lock((h)) == 0) +#define RELEASE_COUNTER_MUTEX(h) (release_lock((h)) == 0) + +#endif /* SEMAPHORE_MACOSX_H */