Issue #8411: new condition variable emulation under Windows for the new GIL,

by Kristján.  Unfortunately the 3.x Windows buildbots are in a wreck, so we'll
have to watch them when they become fit again.
This commit is contained in:
Antoine Pitrou 2010-08-10 13:48:51 +00:00
parent 817c9df7e5
commit e1dd1747e8
1 changed files with 103 additions and 50 deletions

View File

@ -106,7 +106,6 @@ do { \
#define COND_INIT(cond) \
if (pthread_cond_init(&cond, NULL)) { \
Py_FatalError("pthread_cond_init(" #cond ") failed"); };
#define COND_RESET(cond)
#define COND_SIGNAL(cond) \
if (pthread_cond_signal(&cond)) { \
Py_FatalError("pthread_cond_signal(" #cond ") failed"); };
@ -141,64 +140,120 @@ do { \
#include <windows.h>
#define MUTEX_T HANDLE
#define MUTEX_INIT(mut) \
if (!(mut = CreateMutex(NULL, FALSE, NULL))) { \
Py_FatalError("CreateMutex(" #mut ") failed"); };
#define MUTEX_T CRITICAL_SECTION
#define MUTEX_INIT(mut) do { \
if (!(InitializeCriticalSectionAndSpinCount(&(mut), 4000))) \
Py_FatalError("CreateMutex(" #mut ") failed"); \
} while (0)
#define MUTEX_FINI(mut) \
DeleteCriticalSection(&(mut))
#define MUTEX_LOCK(mut) \
if (WaitForSingleObject(mut, INFINITE) != WAIT_OBJECT_0) { \
Py_FatalError("WaitForSingleObject(" #mut ") failed"); };
EnterCriticalSection(&(mut))
#define MUTEX_UNLOCK(mut) \
if (!ReleaseMutex(mut)) { \
Py_FatalError("ReleaseMutex(" #mut ") failed"); };
LeaveCriticalSection(&(mut))
/* We emulate condition variables with events. It is sufficient here.
WaitForMultipleObjects() allows the event to be caught and the mutex
to be taken atomically.
As for SignalObjectAndWait(), its semantics are unfortunately a bit
more foggy. Many sources on the Web define it as atomically releasing
the first object while starting to wait on the second, but MSDN states
it is *not* atomic...
/* We emulate condition variables with a semaphore.
We use a Semaphore rather than an auto-reset event, because although
an auto-resent event might appear to solve the lost-wakeup bug (race
condition between releasing the outer lock and waiting) because it
maintains state even though a wait hasn't happened, there is still
a lost wakeup problem if more than one thread are interrupted in the
critical place. A semaphore solves that.
Because it is ok to signal a condition variable with no one
waiting, we need to keep track of the number of
waiting threads. Otherwise, the semaphore's state could rise
without bound.
In any case, the emulation here is tailored for our particular use case.
For example, we don't care how many threads are woken up when a condition
gets signalled. Generic emulations of the pthread_cond_* API using
Generic emulations of the pthread_cond_* API using
Win32 functions can be found on the Web.
The following read can be edificating (or not):
http://www.cse.wustl.edu/~schmidt/win32-cv-1.html
*/
#define COND_T HANDLE
typedef struct COND_T
{
HANDLE sem; /* the semaphore */
int n_waiting; /* how many are unreleased */
} COND_T;
__inline static void _cond_init(COND_T *cond)
{
/* A semaphore with a large max value, The positive value
* is only needed to catch those "lost wakeup" events and
* race conditions when a timed wait elapses.
*/
if (!(cond->sem = CreateSemaphore(NULL, 0, 1000, NULL)))
Py_FatalError("CreateSemaphore() failed");
cond->n_waiting = 0;
}
__inline static void _cond_fini(COND_T *cond)
{
BOOL ok = CloseHandle(cond->sem);
if (!ok)
Py_FatalError("CloseHandle() failed");
}
__inline static void _cond_wait(COND_T *cond, MUTEX_T *mut)
{
++cond->n_waiting;
MUTEX_UNLOCK(*mut);
/* "lost wakeup bug" would occur if the caller were interrupted here,
* but we are safe because we are using a semaphore wich has an internal
* count.
*/
if (WaitForSingleObject(cond->sem, INFINITE) == WAIT_FAILED)
Py_FatalError("WaitForSingleObject() failed");
MUTEX_LOCK(*mut);
}
__inline static int _cond_timed_wait(COND_T *cond, MUTEX_T *mut,
int us)
{
DWORD r;
++cond->n_waiting;
MUTEX_UNLOCK(*mut);
r = WaitForSingleObject(cond->sem, us / 1000);
if (r == WAIT_FAILED)
Py_FatalError("WaitForSingleObject() failed");
MUTEX_LOCK(*mut);
if (r == WAIT_TIMEOUT)
--cond->n_waiting;
/* Here we have a benign race condition with _cond_signal. If the
* wait operation has timed out, but before we can acquire the
* mutex again to decrement n_waiting, a thread holding the mutex
* still sees a positive n_waiting value and may call
* ReleaseSemaphore and decrement n_waiting.
* This will cause n_waiting to be decremented twice.
* This is benign, though, because ReleaseSemaphore will also have
* been called, leaving the semaphore state positive. We may
* thus end up with semaphore in state 1, and n_waiting == -1, and
* the next time someone calls _cond_wait(), that thread will
* pass right through, decrementing the semaphore state and
* incrementing n_waiting, thus correcting the extra _cond_signal.
*/
return r == WAIT_TIMEOUT;
}
__inline static void _cond_signal(COND_T *cond) {
/* NOTE: This must be called with the mutex held */
if (cond->n_waiting > 0) {
if (!ReleaseSemaphore(cond->sem, 1, NULL))
Py_FatalError("ReleaseSemaphore() failed");
--cond->n_waiting;
}
}
#define COND_INIT(cond) \
/* auto-reset, non-signalled */ \
if (!(cond = CreateEvent(NULL, FALSE, FALSE, NULL))) { \
Py_FatalError("CreateMutex(" #cond ") failed"); };
#define COND_RESET(cond) \
if (!ResetEvent(cond)) { \
Py_FatalError("ResetEvent(" #cond ") failed"); };
_cond_init(&(cond))
#define COND_FINI(cond) \
_cond_fini(&(cond))
#define COND_SIGNAL(cond) \
if (!SetEvent(cond)) { \
Py_FatalError("SetEvent(" #cond ") failed"); };
_cond_signal(&(cond))
#define COND_WAIT(cond, mut) \
{ \
if (SignalObjectAndWait(mut, cond, INFINITE, FALSE) != WAIT_OBJECT_0) \
Py_FatalError("SignalObjectAndWait(" #mut ", " #cond") failed"); \
MUTEX_LOCK(mut); \
}
#define COND_TIMED_WAIT(cond, mut, microseconds, timeout_result) \
{ \
DWORD r; \
HANDLE objects[2] = { cond, mut }; \
MUTEX_UNLOCK(mut); \
r = WaitForMultipleObjects(2, objects, TRUE, microseconds / 1000); \
if (r == WAIT_TIMEOUT) { \
MUTEX_LOCK(mut); \
timeout_result = 1; \
} \
else if (r != WAIT_OBJECT_0) \
Py_FatalError("WaitForSingleObject(" #cond ") failed"); \
else \
timeout_result = 0; \
}
_cond_wait(&(cond), &(mut))
#define COND_TIMED_WAIT(cond, mut, us, timeout_result) do { \
(timeout_result) = _cond_timed_wait(&(cond), &(mut), us); \
} while (0)
#else
@ -282,7 +337,6 @@ static void drop_gil(PyThreadState *tstate)
the GIL and drop it again, and reset the condition
before we even had a chance to wait for it. */
COND_WAIT(switch_cond, switch_mutex);
COND_RESET(switch_cond);
}
MUTEX_UNLOCK(switch_mutex);
}
@ -301,7 +355,6 @@ static void take_gil(PyThreadState *tstate)
if (!_Py_atomic_load_relaxed(&gil_locked))
goto _ready;
COND_RESET(gil_cond);
while (_Py_atomic_load_relaxed(&gil_locked)) {
int timed_out = 0;
unsigned long saved_switchnum;