Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.

This commit is contained in:
Charles-François Natali 2013-03-25 18:20:40 +01:00
parent bb25b6fc94
commit 84e4316489
2 changed files with 28 additions and 43 deletions

View File

@ -22,7 +22,7 @@ import _multiprocessing
from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
from multiprocessing.forking import assert_spawning, ForkingPickler
#
# Queue type using a pipe, buffer and thread
@ -69,8 +69,8 @@ class Queue(object):
self._joincancelled = False
self._closed = False
self._close = None
self._send = self._writer.send
self._recv = self._reader.recv
self._send_bytes = self._writer.send_bytes
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
@ -89,14 +89,9 @@ class Queue(object):
def get(self, block=True, timeout=None):
if block and timeout is None:
self._rlock.acquire()
try:
res = self._recv()
self._sem.release()
return res
finally:
self._rlock.release()
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.time() + timeout
@ -109,11 +104,12 @@ class Queue(object):
raise Empty
elif not self._poll():
raise Empty
res = self._recv()
res = self._recv_bytes()
self._sem.release()
return res
finally:
self._rlock.release()
# unserialize the data after having released the lock
return ForkingPickler.loads(res)
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@ -158,7 +154,7 @@ class Queue(object):
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
@ -210,7 +206,7 @@ class Queue(object):
notempty.release()
@staticmethod
def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@ -241,16 +237,14 @@ class Queue(object):
close()
return
# serialize the data before acquiring the lock
obj = ForkingPickler.dumps(obj)
if wacquire is None:
send(obj)
# Delete references to object. See issue16284
del obj
send_bytes(obj)
else:
wacquire()
try:
send(obj)
# Delete references to object. See issue16284
del obj
send_bytes(obj)
finally:
wrelease()
except IndexError:
@ -344,7 +338,6 @@ class SimpleQueue(object):
self._wlock = None
else:
self._wlock = Lock()
self._make_methods()
def empty(self):
return not self._poll()
@ -355,29 +348,19 @@ class SimpleQueue(object):
def __setstate__(self, state):
(self._reader, self._writer, self._rlock, self._wlock) = state
self._make_methods()
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
def get():
racquire()
try:
return recv()
finally:
rrelease()
self.get = get
def get(self):
with self._rlock:
res = self._reader.recv_bytes()
# unserialize the data after having released the lock
return ForkingPickler.loads(res)
def put(self, obj):
# serialize the data before acquiring the lock
obj = ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
self.put = self._writer.send
self._writer.send_bytes(obj)
else:
send = self._writer.send
wacquire, wrelease = self._wlock.acquire, self._wlock.release
def put(obj):
wacquire()
try:
return send(obj)
finally:
wrelease()
self.put = put
with self._wlock:
self._writer.send_bytes(obj)

View File

@ -294,6 +294,8 @@ Core and Builtins
Library
-------
- Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.
- Issue #17536: Add to webbrowser's browser list: www-browser, x-www-browser,
iceweasel, iceape.