363 lines
11 KiB
Python
363 lines
11 KiB
Python
#
|
|
# Module to allow spawning of processes on foreign host
|
|
#
|
|
# Depends on `multiprocessing` package -- tested with `processing-0.60`
|
|
#
|
|
|
|
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
|
|
|
|
#
|
|
# Imports
|
|
#
|
|
|
|
import sys
|
|
import os
|
|
import tarfile
|
|
import shutil
|
|
import subprocess
|
|
import logging
|
|
import itertools
|
|
import Queue
|
|
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
|
|
from multiprocessing import Process, current_process, cpu_count
|
|
from multiprocessing import util, managers, connection, forking, pool
|
|
|
|
#
|
|
# Logging
|
|
#
|
|
|
|
def get_logger():
|
|
return _logger
|
|
|
|
_logger = logging.getLogger('distributing')
|
|
_logger.propogate = 0
|
|
|
|
util.fix_up_logger(_logger)
|
|
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
|
|
_handler = logging.StreamHandler()
|
|
_handler.setFormatter(_formatter)
|
|
_logger.addHandler(_handler)
|
|
|
|
info = _logger.info
|
|
debug = _logger.debug
|
|
|
|
#
|
|
# Get number of cpus
|
|
#
|
|
|
|
try:
|
|
slot_count = cpu_count()
|
|
except NotImplemented:
|
|
slot_count = 1
|
|
|
|
#
|
|
# Manager type which spawns subprocesses
|
|
#
|
|
|
|
class HostManager(managers.SyncManager):
|
|
'''
|
|
Manager type used for spawning processes on a (presumably) foreign host
|
|
'''
|
|
def __init__(self, address, authkey):
|
|
managers.SyncManager.__init__(self, address, authkey)
|
|
self._name = 'Host-unknown'
|
|
|
|
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
|
|
if hasattr(sys.modules['__main__'], '__file__'):
|
|
main_path = os.path.basename(sys.modules['__main__'].__file__)
|
|
else:
|
|
main_path = None
|
|
data = pickle.dumps((target, args, kwargs))
|
|
p = self._RemoteProcess(data, main_path)
|
|
if name is None:
|
|
temp = self._name.split('Host-')[-1] + '/Process-%s'
|
|
name = temp % ':'.join(map(str, p.get_identity()))
|
|
p.set_name(name)
|
|
return p
|
|
|
|
@classmethod
|
|
def from_address(cls, address, authkey):
|
|
manager = cls(address, authkey)
|
|
managers.transact(address, authkey, 'dummy')
|
|
manager._state.value = managers.State.STARTED
|
|
manager._name = 'Host-%s:%s' % manager.address
|
|
manager.shutdown = util.Finalize(
|
|
manager, HostManager._finalize_host,
|
|
args=(manager._address, manager._authkey, manager._name),
|
|
exitpriority=-10
|
|
)
|
|
return manager
|
|
|
|
@staticmethod
|
|
def _finalize_host(address, authkey, name):
|
|
managers.transact(address, authkey, 'shutdown')
|
|
|
|
def __repr__(self):
|
|
return '<Host(%s)>' % self._name
|
|
|
|
#
|
|
# Process subclass representing a process on (possibly) a remote machine
|
|
#
|
|
|
|
class RemoteProcess(Process):
|
|
'''
|
|
Represents a process started on a remote host
|
|
'''
|
|
def __init__(self, data, main_path):
|
|
assert not main_path or os.path.basename(main_path) == main_path
|
|
Process.__init__(self)
|
|
self._data = data
|
|
self._main_path = main_path
|
|
|
|
def _bootstrap(self):
|
|
forking.prepare({'main_path': self._main_path})
|
|
self._target, self._args, self._kwargs = pickle.loads(self._data)
|
|
return Process._bootstrap(self)
|
|
|
|
def get_identity(self):
|
|
return self._identity
|
|
|
|
HostManager.register('_RemoteProcess', RemoteProcess)
|
|
|
|
#
|
|
# A Pool class that uses a cluster
|
|
#
|
|
|
|
class DistributedPool(pool.Pool):
|
|
|
|
def __init__(self, cluster, processes=None, initializer=None, initargs=()):
|
|
self._cluster = cluster
|
|
self.Process = cluster.Process
|
|
pool.Pool.__init__(self, processes or len(cluster),
|
|
initializer, initargs)
|
|
|
|
def _setup_queues(self):
|
|
self._inqueue = self._cluster._SettableQueue()
|
|
self._outqueue = self._cluster._SettableQueue()
|
|
self._quick_put = self._inqueue.put
|
|
self._quick_get = self._outqueue.get
|
|
|
|
@staticmethod
|
|
def _help_stuff_finish(inqueue, task_handler, size):
|
|
inqueue.set_contents([None] * size)
|
|
|
|
#
|
|
# Manager type which starts host managers on other machines
|
|
#
|
|
|
|
def LocalProcess(**kwds):
|
|
p = Process(**kwds)
|
|
p.set_name('localhost/' + p.get_name())
|
|
return p
|
|
|
|
class Cluster(managers.SyncManager):
|
|
'''
|
|
Represents collection of slots running on various hosts.
|
|
|
|
`Cluster` is a subclass of `SyncManager` so it allows creation of
|
|
various types of shared objects.
|
|
'''
|
|
def __init__(self, hostlist, modules):
|
|
managers.SyncManager.__init__(self, address=('localhost', 0))
|
|
self._hostlist = hostlist
|
|
self._modules = modules
|
|
if __name__ not in modules:
|
|
modules.append(__name__)
|
|
files = [sys.modules[name].__file__ for name in modules]
|
|
for i, file in enumerate(files):
|
|
if file.endswith('.pyc') or file.endswith('.pyo'):
|
|
files[i] = file[:-4] + '.py'
|
|
self._files = [os.path.abspath(file) for file in files]
|
|
|
|
def start(self):
|
|
managers.SyncManager.start(self)
|
|
|
|
l = connection.Listener(family='AF_INET', authkey=self._authkey)
|
|
|
|
for i, host in enumerate(self._hostlist):
|
|
host._start_manager(i, self._authkey, l.address, self._files)
|
|
|
|
for host in self._hostlist:
|
|
if host.hostname != 'localhost':
|
|
conn = l.accept()
|
|
i, address, cpus = conn.recv()
|
|
conn.close()
|
|
other_host = self._hostlist[i]
|
|
other_host.manager = HostManager.from_address(address,
|
|
self._authkey)
|
|
other_host.slots = other_host.slots or cpus
|
|
other_host.Process = other_host.manager.Process
|
|
else:
|
|
host.slots = host.slots or slot_count
|
|
host.Process = LocalProcess
|
|
|
|
self._slotlist = [
|
|
Slot(host) for host in self._hostlist for i in range(host.slots)
|
|
]
|
|
self._slot_iterator = itertools.cycle(self._slotlist)
|
|
self._base_shutdown = self.shutdown
|
|
del self.shutdown
|
|
|
|
def shutdown(self):
|
|
for host in self._hostlist:
|
|
if host.hostname != 'localhost':
|
|
host.manager.shutdown()
|
|
self._base_shutdown()
|
|
|
|
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
|
|
slot = self._slot_iterator.next()
|
|
return slot.Process(
|
|
group=group, target=target, name=name, args=args, kwargs=kwargs
|
|
)
|
|
|
|
def Pool(self, processes=None, initializer=None, initargs=()):
|
|
return DistributedPool(self, processes, initializer, initargs)
|
|
|
|
def __getitem__(self, i):
|
|
return self._slotlist[i]
|
|
|
|
def __len__(self):
|
|
return len(self._slotlist)
|
|
|
|
def __iter__(self):
|
|
return iter(self._slotlist)
|
|
|
|
#
|
|
# Queue subclass used by distributed pool
|
|
#
|
|
|
|
class SettableQueue(Queue.Queue):
|
|
def empty(self):
|
|
return not self.queue
|
|
def full(self):
|
|
return self.maxsize > 0 and len(self.queue) == self.maxsize
|
|
def set_contents(self, contents):
|
|
# length of contents must be at least as large as the number of
|
|
# threads which have potentially called get()
|
|
self.not_empty.acquire()
|
|
try:
|
|
self.queue.clear()
|
|
self.queue.extend(contents)
|
|
self.not_empty.notifyAll()
|
|
finally:
|
|
self.not_empty.release()
|
|
|
|
Cluster.register('_SettableQueue', SettableQueue)
|
|
|
|
#
|
|
# Class representing a notional cpu in the cluster
|
|
#
|
|
|
|
class Slot(object):
|
|
def __init__(self, host):
|
|
self.host = host
|
|
self.Process = host.Process
|
|
|
|
#
|
|
# Host
|
|
#
|
|
|
|
class Host(object):
|
|
'''
|
|
Represents a host to use as a node in a cluster.
|
|
|
|
`hostname` gives the name of the host. If hostname is not
|
|
"localhost" then ssh is used to log in to the host. To log in as
|
|
a different user use a host name of the form
|
|
"username@somewhere.org"
|
|
|
|
`slots` is used to specify the number of slots for processes on
|
|
the host. This affects how often processes will be allocated to
|
|
this host. Normally this should be equal to the number of cpus on
|
|
that host.
|
|
'''
|
|
def __init__(self, hostname, slots=None):
|
|
self.hostname = hostname
|
|
self.slots = slots
|
|
|
|
def _start_manager(self, index, authkey, address, files):
|
|
if self.hostname != 'localhost':
|
|
tempdir = copy_to_remote_temporary_directory(self.hostname, files)
|
|
debug('startup files copied to %s:%s', self.hostname, tempdir)
|
|
p = subprocess.Popen(
|
|
['ssh', self.hostname, 'python', '-c',
|
|
'"import os; os.chdir(%r); '
|
|
'from distributing import main; main()"' % tempdir],
|
|
stdin=subprocess.PIPE
|
|
)
|
|
data = dict(
|
|
name='BoostrappingHost', index=index,
|
|
dist_log_level=_logger.getEffectiveLevel(),
|
|
dir=tempdir, authkey=str(authkey), parent_address=address
|
|
)
|
|
pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
|
|
p.stdin.close()
|
|
|
|
#
|
|
# Copy files to remote directory, returning name of directory
|
|
#
|
|
|
|
unzip_code = '''"
|
|
import tempfile, os, sys, tarfile
|
|
tempdir = tempfile.mkdtemp(prefix='distrib-')
|
|
os.chdir(tempdir)
|
|
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
|
|
for ti in tf:
|
|
tf.extract(ti)
|
|
print tempdir
|
|
"'''
|
|
|
|
def copy_to_remote_temporary_directory(host, files):
|
|
p = subprocess.Popen(
|
|
['ssh', host, 'python', '-c', unzip_code],
|
|
stdout=subprocess.PIPE, stdin=subprocess.PIPE
|
|
)
|
|
tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
|
|
for name in files:
|
|
tf.add(name, os.path.basename(name))
|
|
tf.close()
|
|
p.stdin.close()
|
|
return p.stdout.read().rstrip()
|
|
|
|
#
|
|
# Code which runs a host manager
|
|
#
|
|
|
|
def main():
|
|
# get data from parent over stdin
|
|
data = pickle.load(sys.stdin)
|
|
sys.stdin.close()
|
|
|
|
# set some stuff
|
|
_logger.setLevel(data['dist_log_level'])
|
|
forking.prepare(data)
|
|
|
|
# create server for a `HostManager` object
|
|
server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
|
|
current_process()._server = server
|
|
|
|
# report server address and number of cpus back to parent
|
|
conn = connection.Client(data['parent_address'], authkey=data['authkey'])
|
|
conn.send((data['index'], server.address, slot_count))
|
|
conn.close()
|
|
|
|
# set name etc
|
|
current_process().set_name('Host-%s:%s' % server.address)
|
|
util._run_after_forkers()
|
|
|
|
# register a cleanup function
|
|
def cleanup(directory):
|
|
debug('removing directory %s', directory)
|
|
shutil.rmtree(directory)
|
|
debug('shutting down host manager')
|
|
util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
|
|
|
|
# start host manager
|
|
debug('remote host manager starting in %s', data['dir'])
|
|
server.serve_forever()
|