# # Module to allow spawning of processes on foreign host # # Depends on `multiprocessing` package -- tested with `processing-0.60` # __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] # # Imports # import sys import os import tarfile import shutil import subprocess import logging import itertools import Queue try: import cPickle as pickle except ImportError: import pickle from multiprocessing import Process, current_process, cpu_count from multiprocessing import util, managers, connection, forking, pool # # Logging # def get_logger(): return _logger _logger = logging.getLogger('distributing') _logger.propogate = 0 util.fix_up_logger(_logger) _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) _handler = logging.StreamHandler() _handler.setFormatter(_formatter) _logger.addHandler(_handler) info = _logger.info debug = _logger.debug # # Get number of cpus # try: slot_count = cpu_count() except NotImplemented: slot_count = 1 # # Manager type which spawns subprocesses # class HostManager(managers.SyncManager): ''' Manager type used for spawning processes on a (presumably) foreign host ''' def __init__(self, address, authkey): managers.SyncManager.__init__(self, address, authkey) self._name = 'Host-unknown' def Process(self, group=None, target=None, name=None, args=(), kwargs={}): if hasattr(sys.modules['__main__'], '__file__'): main_path = os.path.basename(sys.modules['__main__'].__file__) else: main_path = None data = pickle.dumps((target, args, kwargs)) p = self._RemoteProcess(data, main_path) if name is None: temp = self._name.split('Host-')[-1] + '/Process-%s' name = temp % ':'.join(map(str, p.get_identity())) p.set_name(name) return p @classmethod def from_address(cls, address, authkey): manager = cls(address, authkey) managers.transact(address, authkey, 'dummy') manager._state.value = managers.State.STARTED manager._name = 'Host-%s:%s' % manager.address manager.shutdown = util.Finalize( manager, HostManager._finalize_host, args=(manager._address, manager._authkey, manager._name), exitpriority=-10 ) return manager @staticmethod def _finalize_host(address, authkey, name): managers.transact(address, authkey, 'shutdown') def __repr__(self): return '' % self._name # # Process subclass representing a process on (possibly) a remote machine # class RemoteProcess(Process): ''' Represents a process started on a remote host ''' def __init__(self, data, main_path): assert not main_path or os.path.basename(main_path) == main_path Process.__init__(self) self._data = data self._main_path = main_path def _bootstrap(self): forking.prepare({'main_path': self._main_path}) self._target, self._args, self._kwargs = pickle.loads(self._data) return Process._bootstrap(self) def get_identity(self): return self._identity HostManager.register('_RemoteProcess', RemoteProcess) # # A Pool class that uses a cluster # class DistributedPool(pool.Pool): def __init__(self, cluster, processes=None, initializer=None, initargs=()): self._cluster = cluster self.Process = cluster.Process pool.Pool.__init__(self, processes or len(cluster), initializer, initargs) def _setup_queues(self): self._inqueue = self._cluster._SettableQueue() self._outqueue = self._cluster._SettableQueue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): inqueue.set_contents([None] * size) # # Manager type which starts host managers on other machines # def LocalProcess(**kwds): p = Process(**kwds) p.set_name('localhost/' + p.name) return p class Cluster(managers.SyncManager): ''' Represents collection of slots running on various hosts. `Cluster` is a subclass of `SyncManager` so it allows creation of various types of shared objects. ''' def __init__(self, hostlist, modules): managers.SyncManager.__init__(self, address=('localhost', 0)) self._hostlist = hostlist self._modules = modules if __name__ not in modules: modules.append(__name__) files = [sys.modules[name].__file__ for name in modules] for i, file in enumerate(files): if file.endswith('.pyc') or file.endswith('.pyo'): files[i] = file[:-4] + '.py' self._files = [os.path.abspath(file) for file in files] def start(self): managers.SyncManager.start(self) l = connection.Listener(family='AF_INET', authkey=self._authkey) for i, host in enumerate(self._hostlist): host._start_manager(i, self._authkey, l.address, self._files) for host in self._hostlist: if host.hostname != 'localhost': conn = l.accept() i, address, cpus = conn.recv() conn.close() other_host = self._hostlist[i] other_host.manager = HostManager.from_address(address, self._authkey) other_host.slots = other_host.slots or cpus other_host.Process = other_host.manager.Process else: host.slots = host.slots or slot_count host.Process = LocalProcess self._slotlist = [ Slot(host) for host in self._hostlist for i in range(host.slots) ] self._slot_iterator = itertools.cycle(self._slotlist) self._base_shutdown = self.shutdown del self.shutdown def shutdown(self): for host in self._hostlist: if host.hostname != 'localhost': host.manager.shutdown() self._base_shutdown() def Process(self, group=None, target=None, name=None, args=(), kwargs={}): slot = self._slot_iterator.next() return slot.Process( group=group, target=target, name=name, args=args, kwargs=kwargs ) def Pool(self, processes=None, initializer=None, initargs=()): return DistributedPool(self, processes, initializer, initargs) def __getitem__(self, i): return self._slotlist[i] def __len__(self): return len(self._slotlist) def __iter__(self): return iter(self._slotlist) # # Queue subclass used by distributed pool # class SettableQueue(Queue.Queue): def empty(self): return not self.queue def full(self): return self.maxsize > 0 and len(self.queue) == self.maxsize def set_contents(self, contents): # length of contents must be at least as large as the number of # threads which have potentially called get() self.not_empty.acquire() try: self.queue.clear() self.queue.extend(contents) self.not_empty.notifyAll() finally: self.not_empty.release() Cluster.register('_SettableQueue', SettableQueue) # # Class representing a notional cpu in the cluster # class Slot(object): def __init__(self, host): self.host = host self.Process = host.Process # # Host # class Host(object): ''' Represents a host to use as a node in a cluster. `hostname` gives the name of the host. If hostname is not "localhost" then ssh is used to log in to the host. To log in as a different user use a host name of the form "username@somewhere.org" `slots` is used to specify the number of slots for processes on the host. This affects how often processes will be allocated to this host. Normally this should be equal to the number of cpus on that host. ''' def __init__(self, hostname, slots=None): self.hostname = hostname self.slots = slots def _start_manager(self, index, authkey, address, files): if self.hostname != 'localhost': tempdir = copy_to_remote_temporary_directory(self.hostname, files) debug('startup files copied to %s:%s', self.hostname, tempdir) p = subprocess.Popen( ['ssh', self.hostname, 'python', '-c', '"import os; os.chdir(%r); ' 'from distributing import main; main()"' % tempdir], stdin=subprocess.PIPE ) data = dict( name='BoostrappingHost', index=index, dist_log_level=_logger.getEffectiveLevel(), dir=tempdir, authkey=str(authkey), parent_address=address ) pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) p.stdin.close() # # Copy files to remote directory, returning name of directory # unzip_code = '''" import tempfile, os, sys, tarfile tempdir = tempfile.mkdtemp(prefix='distrib-') os.chdir(tempdir) tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') for ti in tf: tf.extract(ti) print tempdir "''' def copy_to_remote_temporary_directory(host, files): p = subprocess.Popen( ['ssh', host, 'python', '-c', unzip_code], stdout=subprocess.PIPE, stdin=subprocess.PIPE ) tf = tarfile.open(fileobj=p.stdin, mode='w|gz') for name in files: tf.add(name, os.path.basename(name)) tf.close() p.stdin.close() return p.stdout.read().rstrip() # # Code which runs a host manager # def main(): # get data from parent over stdin data = pickle.load(sys.stdin) sys.stdin.close() # set some stuff _logger.setLevel(data['dist_log_level']) forking.prepare(data) # create server for a `HostManager` object server = managers.Server(HostManager._registry, ('', 0), data['authkey']) current_process()._server = server # report server address and number of cpus back to parent conn = connection.Client(data['parent_address'], authkey=data['authkey']) conn.send((data['index'], server.address, slot_count)) conn.close() # set name etc current_process().set_name('Host-%s:%s' % server.address) util._run_after_forkers() # register a cleanup function def cleanup(directory): debug('removing directory %s', directory) shutil.rmtree(directory) debug('shutting down host manager') util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) # start host manager debug('remote host manager starting in %s', data['dir']) server.serve_forever()