diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index c0fa1fccbe1..f96505630ae 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -97,6 +97,13 @@ def Manager(): m.start() return m +def Pipe(duplex=True): + ''' + Returns two connection object connected by a pipe + ''' + from multiprocessing.connection import Pipe + return Pipe(duplex) + def cpu_count(): ''' Returns the number of CPUs in the system @@ -131,28 +138,134 @@ def freeze_support(): from multiprocessing.forking import freeze_support freeze_support() +def get_logger(): + ''' + Return package logger -- if it does not already exist then it is created + ''' + from multiprocessing.util import get_logger + return get_logger() + +def log_to_stderr(level=None): + ''' + Turn on logging and add a handler which prints to stderr + ''' + from multiprocessing.util import log_to_stderr + return log_to_stderr(level) + def allow_connection_pickling(): ''' Install support for sending connections and sockets between processes ''' from multiprocessing import reduction -# Alias some names from submodules in the package namespace -from multiprocessing.connection import Pipe -from multiprocessing.util import (get_logger, log_to_stderr) - # # Definitions depending on native semaphores # -# Alias some names from submodules in the package namespace -from multiprocessing.synchronize import (Lock, RLock, Condition, Event, - Semaphore, BoundedSemaphore) -from multiprocessing.queues import (Queue, JoinableQueue) -from multiprocessing.pool import Pool -from multiprocessing.sharedctypes import (RawValue, Value, - RawArray, Array) + +def Lock(): + ''' + Returns a non-recursive lock object + ''' + from multiprocessing.synchronize import Lock + return Lock() + +def RLock(): + ''' + Returns a recursive lock object + ''' + from multiprocessing.synchronize import RLock + return RLock() + +def Condition(lock=None): + ''' + Returns a condition object + ''' + from multiprocessing.synchronize import Condition + return Condition(lock) + +def Semaphore(value=1): + ''' + Returns a semaphore object + ''' + from multiprocessing.synchronize import Semaphore + return Semaphore(value) + +def BoundedSemaphore(value=1): + ''' + Returns a bounded semaphore object + ''' + from multiprocessing.synchronize import BoundedSemaphore + return BoundedSemaphore(value) + +def Event(): + ''' + Returns an event object + ''' + from multiprocessing.synchronize import Event + return Event() + +def Queue(maxsize=0): + ''' + Returns a queue object + ''' + from multiprocessing.queues import Queue + return Queue(maxsize) + +def JoinableQueue(maxsize=0): + ''' + Returns a queue object + ''' + from multiprocessing.queues import JoinableQueue + return JoinableQueue(maxsize) + +def Pool(processes=None, initializer=None, initargs=()): + ''' + Returns a process pool object + ''' + from multiprocessing.pool import Pool + return Pool(processes, initializer, initargs) + +def RawValue(typecode_or_type, *args): + ''' + Returns a shared object + ''' + from multiprocessing.sharedctypes import RawValue + return RawValue(typecode_or_type, *args) + +def RawArray(typecode_or_type, size_or_initializer): + ''' + Returns a shared array + ''' + from multiprocessing.sharedctypes import RawArray + return RawArray(typecode_or_type, size_or_initializer) + +def Value(typecode_or_type, *args, **kwds): + ''' + Returns a synchronized shared object + ''' + from multiprocessing.sharedctypes import Value + return Value(typecode_or_type, *args, **kwds) + +def Array(typecode_or_type, size_or_initializer, **kwds): + ''' + Returns a synchronized shared array + ''' + from multiprocessing.sharedctypes import Array + return Array(typecode_or_type, size_or_initializer, **kwds) + +# +# +# if sys.platform == 'win32': - from multiprocessing.forking import set_executable + + def set_executable(executable): + ''' + Sets the path to a python.exe or pythonw.exe binary used to run + child processes on Windows instead of sys.executable. + Useful for people embedding Python. + ''' + from multiprocessing.forking import set_executable + set_executable(executable) __all__ += ['set_executable'] diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 256e5724596..f32dab19afe 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -371,7 +371,13 @@ class Server(object): self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) if ident not in self.id_to_refcount: - self.id_to_refcount[ident] = None + self.id_to_refcount[ident] = 0 + # increment the reference count immediately, to avoid + # this object being garbage collected before a Proxy + # object for it can be created. The caller of create() + # is responsible for doing a decref once the Proxy object + # has been created. + self.incref(c, ident) return ident, tuple(exposed) finally: self.mutex.release() @@ -393,11 +399,7 @@ class Server(object): def incref(self, c, ident): self.mutex.acquire() try: - try: - self.id_to_refcount[ident] += 1 - except TypeError: - assert self.id_to_refcount[ident] is None - self.id_to_refcount[ident] = 1 + self.id_to_refcount[ident] += 1 finally: self.mutex.release() @@ -634,6 +636,8 @@ class BaseManager(object): token, self._serializer, manager=self, authkey=self._authkey, exposed=exp ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) return proxy temp.__name__ = typeid setattr(cls, typeid, temp) @@ -726,10 +730,13 @@ class BaseProxy(object): elif kind == '#PROXY': exposed, token = result proxytype = self._manager._registry[token.typeid][-1] - return proxytype( + proxy = proxytype( token, self._serializer, manager=self._manager, authkey=self._authkey, exposed=exposed ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) + return proxy raise convert_to_error(kind, result) def _getvalue(self): diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 5ca8fb8bfbc..0054ff1822e 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -63,7 +63,7 @@ def RawArray(typecode_or_type, size_or_initializer): def Value(typecode_or_type, *args, **kwds): ''' - Return a synchronization wrapper for a RawValue + Return a synchronization wrapper for a Value ''' lock = kwds.pop('lock', None) if kwds: diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 3e21dfec853..428656a5b15 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -65,9 +65,7 @@ class SemLock(object): # class Semaphore(SemLock): - ''' - A semaphore object - ''' + def __init__(self, value=1): SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) @@ -86,9 +84,7 @@ class Semaphore(SemLock): # class BoundedSemaphore(Semaphore): - ''' - A bounded semaphore object - ''' + def __init__(self, value=1): SemLock.__init__(self, SEMAPHORE, value, value) @@ -105,9 +101,7 @@ class BoundedSemaphore(Semaphore): # class Lock(SemLock): - ''' - A non-recursive lock object - ''' + def __init__(self): SemLock.__init__(self, SEMAPHORE, 1, 1) @@ -132,9 +126,7 @@ class Lock(SemLock): # class RLock(SemLock): - ''' - A recursive lock object - ''' + def __init__(self): SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) @@ -160,9 +152,6 @@ class RLock(SemLock): # class Condition(object): - ''' - A condition object - ''' def __init__(self, lock=None): self._lock = lock or RLock() @@ -263,9 +252,7 @@ class Condition(object): # class Event(object): - ''' - An event object - ''' + def __init__(self): self._cond = Condition(Lock()) self._flag = Semaphore(0) diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index c3e811c4dad..7d53512725c 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -54,7 +54,7 @@ def sub_warning(msg, *args): def get_logger(): ''' - Return package logger -- if it does not already exist then it is created + Returns logger used by multiprocessing ''' global _logger