issue5738: The distribution example was confusing, and out of date. It's too large to include inline in the docs as well. It belongs in an addons module outside the stdlib. Removing.
This commit is contained in:
parent
c4920e86ef
commit
6c3767445c
|
@ -1,364 +0,0 @@
|
|||
#
|
||||
# Module to allow spawning of processes on foreign host
|
||||
#
|
||||
# Depends on `multiprocessing` package -- tested with `processing-0.60`
|
||||
#
|
||||
# Copyright (c) 2006-2008, R Oudkerk
|
||||
# All rights reserved.
|
||||
#
|
||||
|
||||
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
|
||||
|
||||
#
|
||||
# Imports
|
||||
#
|
||||
|
||||
import sys
|
||||
import os
|
||||
import tarfile
|
||||
import shutil
|
||||
import subprocess
|
||||
import logging
|
||||
import itertools
|
||||
import Queue
|
||||
|
||||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError:
|
||||
import pickle
|
||||
|
||||
from multiprocessing import Process, current_process, cpu_count
|
||||
from multiprocessing import util, managers, connection, forking, pool
|
||||
|
||||
#
|
||||
# Logging
|
||||
#
|
||||
|
||||
def get_logger():
|
||||
return _logger
|
||||
|
||||
_logger = logging.getLogger('distributing')
|
||||
_logger.propagate = 0
|
||||
|
||||
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
|
||||
_handler = logging.StreamHandler()
|
||||
_handler.setFormatter(_formatter)
|
||||
_logger.addHandler(_handler)
|
||||
|
||||
info = _logger.info
|
||||
debug = _logger.debug
|
||||
|
||||
#
|
||||
# Get number of cpus
|
||||
#
|
||||
|
||||
try:
|
||||
slot_count = cpu_count()
|
||||
except NotImplemented:
|
||||
slot_count = 1
|
||||
|
||||
#
|
||||
# Manager type which spawns subprocesses
|
||||
#
|
||||
|
||||
class HostManager(managers.SyncManager):
|
||||
'''
|
||||
Manager type used for spawning processes on a (presumably) foreign host
|
||||
'''
|
||||
def __init__(self, address, authkey):
|
||||
managers.SyncManager.__init__(self, address, authkey)
|
||||
self._name = 'Host-unknown'
|
||||
|
||||
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
|
||||
if hasattr(sys.modules['__main__'], '__file__'):
|
||||
main_path = os.path.basename(sys.modules['__main__'].__file__)
|
||||
else:
|
||||
main_path = None
|
||||
data = pickle.dumps((target, args, kwargs))
|
||||
p = self._RemoteProcess(data, main_path)
|
||||
if name is None:
|
||||
temp = self._name.split('Host-')[-1] + '/Process-%s'
|
||||
name = temp % ':'.join(map(str, p.get_identity()))
|
||||
p.set_name(name)
|
||||
return p
|
||||
|
||||
@classmethod
|
||||
def from_address(cls, address, authkey):
|
||||
manager = cls(address, authkey)
|
||||
managers.transact(address, authkey, 'dummy')
|
||||
manager._state.value = managers.State.STARTED
|
||||
manager._name = 'Host-%s:%s' % manager.address
|
||||
manager.shutdown = util.Finalize(
|
||||
manager, HostManager._finalize_host,
|
||||
args=(manager._address, manager._authkey, manager._name),
|
||||
exitpriority=-10
|
||||
)
|
||||
return manager
|
||||
|
||||
@staticmethod
|
||||
def _finalize_host(address, authkey, name):
|
||||
managers.transact(address, authkey, 'shutdown')
|
||||
|
||||
def __repr__(self):
|
||||
return '<Host(%s)>' % self._name
|
||||
|
||||
#
|
||||
# Process subclass representing a process on (possibly) a remote machine
|
||||
#
|
||||
|
||||
class RemoteProcess(Process):
|
||||
'''
|
||||
Represents a process started on a remote host
|
||||
'''
|
||||
def __init__(self, data, main_path):
|
||||
assert not main_path or os.path.basename(main_path) == main_path
|
||||
Process.__init__(self)
|
||||
self._data = data
|
||||
self._main_path = main_path
|
||||
|
||||
def _bootstrap(self):
|
||||
forking.prepare({'main_path': self._main_path})
|
||||
self._target, self._args, self._kwargs = pickle.loads(self._data)
|
||||
return Process._bootstrap(self)
|
||||
|
||||
def get_identity(self):
|
||||
return self._identity
|
||||
|
||||
HostManager.register('_RemoteProcess', RemoteProcess)
|
||||
|
||||
#
|
||||
# A Pool class that uses a cluster
|
||||
#
|
||||
|
||||
class DistributedPool(pool.Pool):
|
||||
|
||||
def __init__(self, cluster, processes=None, initializer=None, initargs=()):
|
||||
self._cluster = cluster
|
||||
self.Process = cluster.Process
|
||||
pool.Pool.__init__(self, processes or len(cluster),
|
||||
initializer, initargs)
|
||||
|
||||
def _setup_queues(self):
|
||||
self._inqueue = self._cluster._SettableQueue()
|
||||
self._outqueue = self._cluster._SettableQueue()
|
||||
self._quick_put = self._inqueue.put
|
||||
self._quick_get = self._outqueue.get
|
||||
|
||||
@staticmethod
|
||||
def _help_stuff_finish(inqueue, task_handler, size):
|
||||
inqueue.set_contents([None] * size)
|
||||
|
||||
#
|
||||
# Manager type which starts host managers on other machines
|
||||
#
|
||||
|
||||
def LocalProcess(**kwds):
|
||||
p = Process(**kwds)
|
||||
p.set_name('localhost/' + p.name)
|
||||
return p
|
||||
|
||||
class Cluster(managers.SyncManager):
|
||||
'''
|
||||
Represents collection of slots running on various hosts.
|
||||
|
||||
`Cluster` is a subclass of `SyncManager` so it allows creation of
|
||||
various types of shared objects.
|
||||
'''
|
||||
def __init__(self, hostlist, modules):
|
||||
managers.SyncManager.__init__(self, address=('localhost', 0))
|
||||
self._hostlist = hostlist
|
||||
self._modules = modules
|
||||
if __name__ not in modules:
|
||||
modules.append(__name__)
|
||||
files = [sys.modules[name].__file__ for name in modules]
|
||||
for i, file in enumerate(files):
|
||||
if file.endswith('.pyc') or file.endswith('.pyo'):
|
||||
files[i] = file[:-4] + '.py'
|
||||
self._files = [os.path.abspath(file) for file in files]
|
||||
|
||||
def start(self):
|
||||
managers.SyncManager.start(self)
|
||||
|
||||
l = connection.Listener(family='AF_INET', authkey=self._authkey)
|
||||
|
||||
for i, host in enumerate(self._hostlist):
|
||||
host._start_manager(i, self._authkey, l.address, self._files)
|
||||
|
||||
for host in self._hostlist:
|
||||
if host.hostname != 'localhost':
|
||||
conn = l.accept()
|
||||
i, address, cpus = conn.recv()
|
||||
conn.close()
|
||||
other_host = self._hostlist[i]
|
||||
other_host.manager = HostManager.from_address(address,
|
||||
self._authkey)
|
||||
other_host.slots = other_host.slots or cpus
|
||||
other_host.Process = other_host.manager.Process
|
||||
else:
|
||||
host.slots = host.slots or slot_count
|
||||
host.Process = LocalProcess
|
||||
|
||||
self._slotlist = [
|
||||
Slot(host) for host in self._hostlist for i in range(host.slots)
|
||||
]
|
||||
self._slot_iterator = itertools.cycle(self._slotlist)
|
||||
self._base_shutdown = self.shutdown
|
||||
del self.shutdown
|
||||
|
||||
def shutdown(self):
|
||||
for host in self._hostlist:
|
||||
if host.hostname != 'localhost':
|
||||
host.manager.shutdown()
|
||||
self._base_shutdown()
|
||||
|
||||
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
|
||||
slot = self._slot_iterator.next()
|
||||
return slot.Process(
|
||||
group=group, target=target, name=name, args=args, kwargs=kwargs
|
||||
)
|
||||
|
||||
def Pool(self, processes=None, initializer=None, initargs=()):
|
||||
return DistributedPool(self, processes, initializer, initargs)
|
||||
|
||||
def __getitem__(self, i):
|
||||
return self._slotlist[i]
|
||||
|
||||
def __len__(self):
|
||||
return len(self._slotlist)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._slotlist)
|
||||
|
||||
#
|
||||
# Queue subclass used by distributed pool
|
||||
#
|
||||
|
||||
class SettableQueue(Queue.Queue):
|
||||
def empty(self):
|
||||
return not self.queue
|
||||
def full(self):
|
||||
return self.maxsize > 0 and len(self.queue) == self.maxsize
|
||||
def set_contents(self, contents):
|
||||
# length of contents must be at least as large as the number of
|
||||
# threads which have potentially called get()
|
||||
self.not_empty.acquire()
|
||||
try:
|
||||
self.queue.clear()
|
||||
self.queue.extend(contents)
|
||||
self.not_empty.notifyAll()
|
||||
finally:
|
||||
self.not_empty.release()
|
||||
|
||||
Cluster.register('_SettableQueue', SettableQueue)
|
||||
|
||||
#
|
||||
# Class representing a notional cpu in the cluster
|
||||
#
|
||||
|
||||
class Slot(object):
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
self.Process = host.Process
|
||||
|
||||
#
|
||||
# Host
|
||||
#
|
||||
|
||||
class Host(object):
|
||||
'''
|
||||
Represents a host to use as a node in a cluster.
|
||||
|
||||
`hostname` gives the name of the host. If hostname is not
|
||||
"localhost" then ssh is used to log in to the host. To log in as
|
||||
a different user use a host name of the form
|
||||
"username@somewhere.org"
|
||||
|
||||
`slots` is used to specify the number of slots for processes on
|
||||
the host. This affects how often processes will be allocated to
|
||||
this host. Normally this should be equal to the number of cpus on
|
||||
that host.
|
||||
'''
|
||||
def __init__(self, hostname, slots=None):
|
||||
self.hostname = hostname
|
||||
self.slots = slots
|
||||
|
||||
def _start_manager(self, index, authkey, address, files):
|
||||
if self.hostname != 'localhost':
|
||||
tempdir = copy_to_remote_temporary_directory(self.hostname, files)
|
||||
debug('startup files copied to %s:%s', self.hostname, tempdir)
|
||||
p = subprocess.Popen(
|
||||
['ssh', self.hostname, 'python', '-c',
|
||||
'"import os; os.chdir(%r); '
|
||||
'from distributing import main; main()"' % tempdir],
|
||||
stdin=subprocess.PIPE
|
||||
)
|
||||
data = dict(
|
||||
name='BoostrappingHost', index=index,
|
||||
dist_log_level=_logger.getEffectiveLevel(),
|
||||
dir=tempdir, authkey=str(authkey), parent_address=address
|
||||
)
|
||||
pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
|
||||
p.stdin.close()
|
||||
|
||||
#
|
||||
# Copy files to remote directory, returning name of directory
|
||||
#
|
||||
|
||||
unzip_code = '''"
|
||||
import tempfile, os, sys, tarfile
|
||||
tempdir = tempfile.mkdtemp(prefix='distrib-')
|
||||
os.chdir(tempdir)
|
||||
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
|
||||
for ti in tf:
|
||||
tf.extract(ti)
|
||||
print tempdir
|
||||
"'''
|
||||
|
||||
def copy_to_remote_temporary_directory(host, files):
|
||||
p = subprocess.Popen(
|
||||
['ssh', host, 'python', '-c', unzip_code],
|
||||
stdout=subprocess.PIPE, stdin=subprocess.PIPE
|
||||
)
|
||||
tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
|
||||
for name in files:
|
||||
tf.add(name, os.path.basename(name))
|
||||
tf.close()
|
||||
p.stdin.close()
|
||||
return p.stdout.read().rstrip()
|
||||
|
||||
#
|
||||
# Code which runs a host manager
|
||||
#
|
||||
|
||||
def main():
|
||||
# get data from parent over stdin
|
||||
data = pickle.load(sys.stdin)
|
||||
sys.stdin.close()
|
||||
|
||||
# set some stuff
|
||||
_logger.setLevel(data['dist_log_level'])
|
||||
forking.prepare(data)
|
||||
|
||||
# create server for a `HostManager` object
|
||||
server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
|
||||
current_process()._server = server
|
||||
|
||||
# report server address and number of cpus back to parent
|
||||
conn = connection.Client(data['parent_address'], authkey=data['authkey'])
|
||||
conn.send((data['index'], server.address, slot_count))
|
||||
conn.close()
|
||||
|
||||
# set name etc
|
||||
current_process().set_name('Host-%s:%s' % server.address)
|
||||
util._run_after_forkers()
|
||||
|
||||
# register a cleanup function
|
||||
def cleanup(directory):
|
||||
debug('removing directory %s', directory)
|
||||
shutil.rmtree(directory)
|
||||
debug('shutting down host manager')
|
||||
util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
|
||||
|
||||
# start host manager
|
||||
debug('remote host manager starting in %s', data['dir'])
|
||||
server.serve_forever()
|
|
@ -2230,10 +2230,3 @@ Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
|
|||
|
||||
.. literalinclude:: ../includes/mp_benchmarks.py
|
||||
|
||||
An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
|
||||
and others to build a system which can distribute processes and work via a
|
||||
distributed queue to a "cluster" of machines on a network, accessible via SSH.
|
||||
You will need to have private key authentication for all hosts configured for
|
||||
this to work.
|
||||
|
||||
.. literalinclude:: ../includes/mp_distributing.py
|
||||
|
|
|
@ -47,6 +47,8 @@ class Queue(object):
|
|||
if sys.platform != 'win32':
|
||||
register_after_fork(self, Queue._after_fork)
|
||||
|
||||
self.getv = 0
|
||||
|
||||
def __getstate__(self):
|
||||
assert_spawning(self)
|
||||
return (self._maxsize, self._reader, self._writer,
|
||||
|
@ -71,6 +73,8 @@ class Queue(object):
|
|||
self._poll = self._reader.poll
|
||||
|
||||
def put(self, obj, block=True, timeout=None):
|
||||
if not isinstance(obj, list):
|
||||
debug('put: %s', obj)
|
||||
assert not self._closed
|
||||
if not self._sem.acquire(block, timeout):
|
||||
raise Full
|
||||
|
@ -85,11 +89,15 @@ class Queue(object):
|
|||
self._notempty.release()
|
||||
|
||||
def get(self, block=True, timeout=None):
|
||||
self.getv += 1
|
||||
debug('self.getv: %s', self.getv)
|
||||
if block and timeout is None:
|
||||
self._rlock.acquire()
|
||||
try:
|
||||
res = self._recv()
|
||||
self._sem.release()
|
||||
if not isinstance(res, list):
|
||||
debug('get: %s', res)
|
||||
return res
|
||||
finally:
|
||||
self._rlock.release()
|
||||
|
@ -104,6 +112,8 @@ class Queue(object):
|
|||
raise Empty
|
||||
res = self._recv()
|
||||
self._sem.release()
|
||||
if not isinstance(res, list):
|
||||
debug('get: %s', res)
|
||||
return res
|
||||
finally:
|
||||
self._rlock.release()
|
||||
|
@ -229,16 +239,22 @@ class Queue(object):
|
|||
try:
|
||||
while 1:
|
||||
obj = bpopleft()
|
||||
if not isinstance(obj, list):
|
||||
debug('feeder thread got: %s', obj)
|
||||
if obj is sentinel:
|
||||
debug('feeder thread got sentinel -- exiting')
|
||||
close()
|
||||
return
|
||||
|
||||
if wacquire is None:
|
||||
if not isinstance(obj, list):
|
||||
debug('sending to pipe: %s', obj)
|
||||
send(obj)
|
||||
else:
|
||||
wacquire()
|
||||
debug('waiting on wacquire')
|
||||
wacquire(timeout=30)
|
||||
try:
|
||||
if not isinstance(obj, list):
|
||||
debug('sending to pipe: %s', obj)
|
||||
send(obj)
|
||||
finally:
|
||||
wrelease()
|
||||
|
|
Loading…
Reference in New Issue