issue3352: clean up the multiprocessing API to remove many get_/set_ methods and convert them to properties. Update the docs and the examples included.

This commit is contained in:
Jesse Noller 2008-08-19 19:06:19 +00:00
parent 7c972f971c
commit 5bc9f4c09c
15 changed files with 81 additions and 87 deletions

View File

@ -152,7 +152,7 @@ class DistributedPool(pool.Pool):
def LocalProcess(**kwds): def LocalProcess(**kwds):
p = Process(**kwds) p = Process(**kwds)
p.set_name('localhost/' + p.get_name()) p.set_name('localhost/' + p.name)
return p return p
class Cluster(managers.SyncManager): class Cluster(managers.SyncManager):

View File

@ -14,7 +14,7 @@ import sys
def calculate(func, args): def calculate(func, args):
result = func(*args) result = func(*args)
return '%s says that %s%s = %s' % ( return '%s says that %s%s = %s' % (
multiprocessing.current_process().get_name(), multiprocessing.current_process().name,
func.__name__, args, result func.__name__, args, result
) )

View File

@ -224,7 +224,7 @@ def test_sharedvalues():
p.start() p.start()
p.join() p.join()
assert p.get_exitcode() == 0 assert p.exitcode == 0
#### ####

View File

@ -21,7 +21,7 @@ if sys.platform == 'win32':
def note(format, *args): def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().get_name(),format%args)) sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler): class RequestHandler(SimpleHTTPRequestHandler):

View File

@ -29,7 +29,7 @@ def worker(input, output):
def calculate(func, args): def calculate(func, args):
result = func(*args) result = func(*args)
return '%s says that %s%s = %s' % \ return '%s says that %s%s = %s' % \
(current_process().get_name(), func.__name__, args, result) (current_process().name, func.__name__, args, result)
# #
# Functions referenced by tasks # Functions referenced by tasks

View File

@ -292,11 +292,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the
A process cannot join itself because this would cause a deadlock. It is A process cannot join itself because this would cause a deadlock. It is
an error to attempt to join a process before it has been started. an error to attempt to join a process before it has been started.
.. method:: get_name() .. attribute:: Process.name
Return the process's name. Return the process's name.
.. method:: set_name(name) .. attribute:: Process.name = name
Set the process's name. Set the process's name.
@ -311,11 +311,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the
Roughly, a process object is alive from the moment the :meth:`start` Roughly, a process object is alive from the moment the :meth:`start`
method returns until the child process terminates. method returns until the child process terminates.
.. method:: is_daemon() .. attribute:: Process.daemon
Return the process's daemon flag. Return the process's daemon flag., this is a boolean.
.. method:: set_daemon(daemonic) .. attribute:: Process.daemon = daemonic
Set the process's daemon flag to the Boolean value *daemonic*. This must Set the process's daemon flag to the Boolean value *daemonic*. This must
be called before :meth:`start` is called. be called before :meth:`start` is called.
@ -331,18 +331,18 @@ The :mod:`multiprocessing` package mostly replicates the API of the
In addition process objects also support the following methods: In addition process objects also support the following methods:
.. method:: get_pid() .. attribute:: Process.pid
Return the process ID. Before the process is spawned, this will be Return the process ID. Before the process is spawned, this will be
``None``. ``None``.
.. method:: get_exit_code() .. attribute:: Process.exitcode
Return the child's exit code. This will be ``None`` if the process has Return the child's exit code. This will be ``None`` if the process has
not yet terminated. A negative value *-N* indicates that the child was not yet terminated. A negative value *-N* indicates that the child was
terminated by signal *N*. terminated by signal *N*.
.. method:: get_auth_key() .. attribute:: Process.authkey
Return the process's authentication key (a byte string). Return the process's authentication key (a byte string).
@ -351,11 +351,11 @@ The :mod:`multiprocessing` package mostly replicates the API of the
When a :class:`Process` object is created, it will inherit the When a :class:`Process` object is created, it will inherit the
authentication key of its parent process, although this may be changed authentication key of its parent process, although this may be changed
using :meth:`set_auth_key` below. using :attr:`Process.authkey` below.
See :ref:`multiprocessing-auth-keys`. See :ref:`multiprocessing-auth-keys`.
.. method:: set_auth_key(authkey) .. attribute:: Process.authkey = authkey
Set the process's authentication key which must be a byte string. Set the process's authentication key which must be a byte string.

View File

@ -47,7 +47,8 @@ class DummyProcess(threading.Thread):
self._parent._children[self] = None self._parent._children[self] = None
threading.Thread.start(self) threading.Thread.start(self)
def get_exitcode(self): @property
def exitcode(self):
if self._start_called and not self.is_alive(): if self._start_called and not self.is_alive():
return 0 return 0
else: else:

View File

@ -360,7 +360,7 @@ else:
sys_argv=sys.argv, sys_argv=sys.argv,
log_to_stderr=_log_to_stderr, log_to_stderr=_log_to_stderr,
orig_dir=process.ORIGINAL_DIR, orig_dir=process.ORIGINAL_DIR,
authkey=process.current_process().get_authkey(), authkey=process.current_process().authkey,
) )
if _logger is not None: if _logger is not None:
@ -407,7 +407,7 @@ def prepare(data):
old_main_modules.append(sys.modules['__main__']) old_main_modules.append(sys.modules['__main__'])
if 'name' in data: if 'name' in data:
process.current_process().set_name(data['name']) process.current_process().name = data['name']
if 'authkey' in data: if 'authkey' in data:
process.current_process()._authkey = data['authkey'] process.current_process()._authkey = data['authkey']

View File

@ -444,7 +444,7 @@ class BaseManager(object):
def __init__(self, address=None, authkey=None, serializer='pickle'): def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None: if authkey is None:
authkey = current_process().get_authkey() authkey = current_process().authkey
self._address = address # XXX not final address if eg ('', 0) self._address = address # XXX not final address if eg ('', 0)
self._authkey = AuthenticationString(authkey) self._authkey = AuthenticationString(authkey)
self._state = State() self._state = State()
@ -489,7 +489,7 @@ class BaseManager(object):
self._serializer, writer), self._serializer, writer),
) )
ident = ':'.join(str(i) for i in self._process._identity) ident = ':'.join(str(i) for i in self._process._identity)
self._process.set_name(type(self).__name__ + '-' + ident) self._process.name = type(self).__name__ + '-' + ident
self._process.start() self._process.start()
# get address of server # get address of server
@ -690,7 +690,7 @@ class BaseProxy(object):
elif self._manager is not None: elif self._manager is not None:
self._authkey = self._manager._authkey self._authkey = self._manager._authkey
else: else:
self._authkey = current_process().get_authkey() self._authkey = current_process().authkey
if incref: if incref:
self._incref() self._incref()
@ -699,7 +699,7 @@ class BaseProxy(object):
def _connect(self): def _connect(self):
util.debug('making connection to manager') util.debug('making connection to manager')
name = current_process().get_name() name = current_process().name
if threading.current_thread().name != 'MainThread': if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name name += '|' + threading.current_thread().name
conn = self._Client(self._token.address, authkey=self._authkey) conn = self._Client(self._token.address, authkey=self._authkey)
@ -880,7 +880,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
if authkey is None and manager is not None: if authkey is None and manager is not None:
authkey = manager._authkey authkey = manager._authkey
if authkey is None: if authkey is None:
authkey = current_process().get_authkey() authkey = current_process().authkey
ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,

View File

@ -99,7 +99,7 @@ class Pool(object):
args=(self._inqueue, self._outqueue, initializer, initargs) args=(self._inqueue, self._outqueue, initializer, initargs)
) )
self._pool.append(w) self._pool.append(w)
w.name = w.get_name().replace('Process', 'PoolWorker') w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True w.daemon = True
w.start() w.start()

View File

@ -132,45 +132,43 @@ class Process(object):
self._popen.poll() self._popen.poll()
return self._popen.returncode is None return self._popen.returncode is None
def get_name(self): @property
''' def name(self):
Return name of process
'''
return self._name return self._name
def set_name(self, name): @name.setter
''' def name(self, name):
Set name of process
'''
assert isinstance(name, str), 'name must be a string' assert isinstance(name, str), 'name must be a string'
self._name = name self._name = name
def is_daemon(self): @property
def daemon(self):
''' '''
Return whether process is a daemon Return whether process is a daemon
''' '''
return self._daemonic return self._daemonic
def set_daemon(self, daemonic): @daemon.setter
def daemon(self, daemonic):
''' '''
Set whether process is a daemon Set whether process is a daemon
''' '''
assert self._popen is None, 'process has already started' assert self._popen is None, 'process has already started'
self._daemonic = daemonic self._daemonic = daemonic
def get_authkey(self): @property
''' def authkey(self):
Return authorization key of process
'''
return self._authkey return self._authkey
def set_authkey(self, authkey): @authkey.setter
def authkey(self, authkey):
''' '''
Set authorization key of process Set authorization key of process
''' '''
self._authkey = AuthenticationString(authkey) self._authkey = AuthenticationString(authkey)
def get_exitcode(self): @property
def exitcode(self):
''' '''
Return exit code of process or `None` if it has yet to stop Return exit code of process or `None` if it has yet to stop
''' '''
@ -178,7 +176,8 @@ class Process(object):
return self._popen return self._popen
return self._popen.poll() return self._popen.poll()
def get_ident(self): @property
def ident(self):
''' '''
Return indentifier (PID) of process or `None` if it has yet to start Return indentifier (PID) of process or `None` if it has yet to start
''' '''
@ -187,7 +186,7 @@ class Process(object):
else: else:
return self._popen and self._popen.pid return self._popen and self._popen.pid
pid = property(get_ident) pid = ident
def __repr__(self): def __repr__(self):
if self is _current_process: if self is _current_process:
@ -198,7 +197,7 @@ class Process(object):
status = 'initial' status = 'initial'
else: else:
if self._popen.poll() is not None: if self._popen.poll() is not None:
status = self.get_exitcode() status = self.exitcode
else: else:
status = 'started' status = 'started'
@ -245,7 +244,7 @@ class Process(object):
except: except:
exitcode = 1 exitcode = 1
import traceback import traceback
sys.stderr.write('Process %s:\n' % self.get_name()) sys.stderr.write('Process %s:\n' % self.name)
sys.stderr.flush() sys.stderr.flush()
traceback.print_exc() traceback.print_exc()

View File

@ -81,9 +81,9 @@ def _get_listener():
try: try:
if _listener is None: if _listener is None:
debug('starting listener and thread for sending handles') debug('starting listener and thread for sending handles')
_listener = Listener(authkey=current_process().get_authkey()) _listener = Listener(authkey=current_process().authkey)
t = threading.Thread(target=_serve) t = threading.Thread(target=_serve)
t.set_daemon(True) t.daemon = True
t.start() t.start()
finally: finally:
_lock.release() _lock.release()
@ -126,7 +126,7 @@ def rebuild_handle(pickled_data):
if inherited: if inherited:
return handle return handle
sub_debug('rebuilding handle %d', handle) sub_debug('rebuilding handle %d', handle)
conn = Client(address, authkey=current_process().get_authkey()) conn = Client(address, authkey=current_process().authkey)
conn.send((handle, os.getpid())) conn.send((handle, os.getpid()))
new_handle = recv_handle(conn) new_handle = recv_handle(conn)
conn.close() conn.close()

View File

@ -108,9 +108,9 @@ class Lock(SemLock):
def __repr__(self): def __repr__(self):
try: try:
if self._semlock._is_mine(): if self._semlock._is_mine():
name = current_process().get_name() name = current_process().name
if threading.current_thread().get_name() != 'MainThread': if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().get_name() name += '|' + threading.current_thread().name
elif self._semlock._get_value() == 1: elif self._semlock._get_value() == 1:
name = 'None' name = 'None'
elif self._semlock._count() > 0: elif self._semlock._count() > 0:
@ -133,9 +133,9 @@ class RLock(SemLock):
def __repr__(self): def __repr__(self):
try: try:
if self._semlock._is_mine(): if self._semlock._is_mine():
name = current_process().get_name() name = current_process().name
if threading.current_thread().get_name() != 'MainThread': if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().get_name() name += '|' + threading.current_thread().name
count = self._semlock._count() count = self._semlock._count()
elif self._semlock._get_value() == 1: elif self._semlock._get_value() == 1:
name, count = 'None', 0 name, count = 'None', 0

View File

@ -273,11 +273,11 @@ def _exit_function():
for p in active_children(): for p in active_children():
if p._daemonic: if p._daemonic:
info('calling terminate() for daemon %s', p.get_name()) info('calling terminate() for daemon %s', p.name)
p._popen.terminate() p._popen.terminate()
for p in active_children(): for p in active_children():
info('calling join() for process %s', p.get_name()) info('calling join() for process %s', p.name)
p.join() p.join()
debug('running the remaining "atexit" finalizers') debug('running the remaining "atexit" finalizers')

View File

@ -119,22 +119,22 @@ class _TestProcess(BaseTestCase):
return return
current = self.current_process() current = self.current_process()
authkey = current.get_authkey() authkey = current.authkey
self.assertTrue(current.is_alive()) self.assertTrue(current.is_alive())
self.assertTrue(not current.is_daemon()) self.assertTrue(not current.daemon)
self.assertTrue(isinstance(authkey, bytes)) self.assertTrue(isinstance(authkey, bytes))
self.assertTrue(len(authkey) > 0) self.assertTrue(len(authkey) > 0)
self.assertEqual(current.get_ident(), os.getpid()) self.assertEqual(current.ident, os.getpid())
self.assertEqual(current.get_exitcode(), None) self.assertEqual(current.exitcode, None)
def _test(self, q, *args, **kwds): def _test(self, q, *args, **kwds):
current = self.current_process() current = self.current_process()
q.put(args) q.put(args)
q.put(kwds) q.put(kwds)
q.put(current.get_name()) q.put(current.name)
if self.TYPE != 'threads': if self.TYPE != 'threads':
q.put(bytes(current.get_authkey())) q.put(bytes(current.authkey))
q.put(current.pid) q.put(current.pid)
def test_process(self): def test_process(self):
@ -146,33 +146,33 @@ class _TestProcess(BaseTestCase):
p = self.Process( p = self.Process(
target=self._test, args=args, kwargs=kwargs, name=name target=self._test, args=args, kwargs=kwargs, name=name
) )
p.set_daemon(True) p.daemon = True
current = self.current_process() current = self.current_process()
if self.TYPE != 'threads': if self.TYPE != 'threads':
self.assertEquals(p.get_authkey(), current.get_authkey()) self.assertEquals(p.authkey, current.authkey)
self.assertEquals(p.is_alive(), False) self.assertEquals(p.is_alive(), False)
self.assertEquals(p.is_daemon(), True) self.assertEquals(p.daemon, True)
self.assertTrue(p not in self.active_children()) self.assertTrue(p not in self.active_children())
self.assertTrue(type(self.active_children()) is list) self.assertTrue(type(self.active_children()) is list)
self.assertEqual(p.get_exitcode(), None) self.assertEqual(p.exitcode, None)
p.start() p.start()
self.assertEquals(p.get_exitcode(), None) self.assertEquals(p.exitcode, None)
self.assertEquals(p.is_alive(), True) self.assertEquals(p.is_alive(), True)
self.assertTrue(p in self.active_children()) self.assertTrue(p in self.active_children())
self.assertEquals(q.get(), args[1:]) self.assertEquals(q.get(), args[1:])
self.assertEquals(q.get(), kwargs) self.assertEquals(q.get(), kwargs)
self.assertEquals(q.get(), p.get_name()) self.assertEquals(q.get(), p.name)
if self.TYPE != 'threads': if self.TYPE != 'threads':
self.assertEquals(q.get(), current.get_authkey()) self.assertEquals(q.get(), current.authkey)
self.assertEquals(q.get(), p.pid) self.assertEquals(q.get(), p.pid)
p.join() p.join()
self.assertEquals(p.get_exitcode(), 0) self.assertEquals(p.exitcode, 0)
self.assertEquals(p.is_alive(), False) self.assertEquals(p.is_alive(), False)
self.assertTrue(p not in self.active_children()) self.assertTrue(p not in self.active_children())
@ -184,12 +184,12 @@ class _TestProcess(BaseTestCase):
return return
p = self.Process(target=self._test_terminate) p = self.Process(target=self._test_terminate)
p.set_daemon(True) p.daemon = True
p.start() p.start()
self.assertEqual(p.is_alive(), True) self.assertEqual(p.is_alive(), True)
self.assertTrue(p in self.active_children()) self.assertTrue(p in self.active_children())
self.assertEqual(p.get_exitcode(), None) self.assertEqual(p.exitcode, None)
p.terminate() p.terminate()
@ -202,8 +202,8 @@ class _TestProcess(BaseTestCase):
p.join() p.join()
# XXX sometimes get p.get_exitcode() == 0 on Windows ... # XXX sometimes get p.exitcode == 0 on Windows ...
#self.assertEqual(p.get_exitcode(), -signal.SIGTERM) #self.assertEqual(p.exitcode, -signal.SIGTERM)
def test_cpu_count(self): def test_cpu_count(self):
try: try:
@ -330,7 +330,7 @@ class _TestQueue(BaseTestCase):
target=self._test_put, target=self._test_put,
args=(queue, child_can_start, parent_can_continue) args=(queue, child_can_start, parent_can_continue)
) )
proc.set_daemon(True) proc.daemon = True
proc.start() proc.start()
self.assertEqual(queue_empty(queue), True) self.assertEqual(queue_empty(queue), True)
@ -396,7 +396,7 @@ class _TestQueue(BaseTestCase):
target=self._test_get, target=self._test_get,
args=(queue, child_can_start, parent_can_continue) args=(queue, child_can_start, parent_can_continue)
) )
proc.set_daemon(True) proc.daemon = True
proc.start() proc.start()
self.assertEqual(queue_empty(queue), True) self.assertEqual(queue_empty(queue), True)
@ -619,16 +619,10 @@ class _TestCondition(BaseTestCase):
woken = self.Semaphore(0) woken = self.Semaphore(0)
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
try:
p.set_daemon(True)
except AttributeError:
p.daemon = True p.daemon = True
p.start() p.start()
p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
try:
p.set_daemon(True)
except AttributeError:
p.daemon = True p.daemon = True
p.start() p.start()
@ -671,7 +665,7 @@ class _TestCondition(BaseTestCase):
for i in range(3): for i in range(3):
p = self.Process(target=self.f, p = self.Process(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1)) args=(cond, sleeping, woken, TIMEOUT1))
p.set_daemon(True) p.daemon = True
p.start() p.start()
t = threading.Thread(target=self.f, t = threading.Thread(target=self.f,
@ -694,7 +688,7 @@ class _TestCondition(BaseTestCase):
# start some more threads/processes # start some more threads/processes
for i in range(3): for i in range(3):
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.set_daemon(True) p.daemon = True
p.start() p.start()
t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
@ -1191,7 +1185,7 @@ class _TestConnection(BaseTestCase):
conn, child_conn = self.Pipe() conn, child_conn = self.Pipe()
p = self.Process(target=self._echo, args=(child_conn,)) p = self.Process(target=self._echo, args=(child_conn,))
p.set_daemon(True) p.daemon = True
p.start() p.start()
seq = [1, 2.25, None] seq = [1, 2.25, None]
@ -1340,7 +1334,7 @@ class _TestListenerClient(BaseTestCase):
for family in self.connection.families: for family in self.connection.families:
l = self.connection.Listener(family=family) l = self.connection.Listener(family=family)
p = self.Process(target=self._test, args=(l.address,)) p = self.Process(target=self._test, args=(l.address,))
p.set_daemon(True) p.daemon = True
p.start() p.start()
conn = l.accept() conn = l.accept()
self.assertEqual(conn.recv(), 'hello') self.assertEqual(conn.recv(), 'hello')