diff --git a/Doc/library/concurrency.rst b/Doc/library/concurrency.rst index 826bf86d081..39cd9ff4826 100644 --- a/Doc/library/concurrency.rst +++ b/Doc/library/concurrency.rst @@ -15,6 +15,7 @@ multitasking). Here's an overview: threading.rst multiprocessing.rst + multiprocessing.shared_memory.rst concurrent.rst concurrent.futures.rst subprocess.rst diff --git a/Doc/library/multiprocessing.shared_memory.rst b/Doc/library/multiprocessing.shared_memory.rst new file mode 100644 index 00000000000..426fef62fa7 --- /dev/null +++ b/Doc/library/multiprocessing.shared_memory.rst @@ -0,0 +1,343 @@ +:mod:`multiprocessing.shared_memory` --- Provides shared memory for direct access across processes +=================================================================================================== + +.. module:: multiprocessing.shared_memory + :synopsis: Provides shared memory for direct access across processes. + +**Source code:** :source:`Lib/multiprocessing/shared_memory.py` + +.. versionadded:: 3.8 + +.. index:: + single: Shared Memory + single: POSIX Shared Memory + single: Named Shared Memory + +-------------- + +This module provides a class, :class:`SharedMemory`, for the allocation +and management of shared memory to be accessed by one or more processes +on a multicore or symmetric multiprocessor (SMP) machine. To assist with +the life-cycle management of shared memory especially across distinct +processes, a :class:`~multiprocessing.managers.BaseManager` subclass, +:class:`SharedMemoryManager`, is also provided in the +``multiprocessing.managers`` module. + +In this module, shared memory refers to "System V style" shared memory blocks +(though is not necessarily implemented explicitly as such) and does not refer +to "distributed shared memory". This style of shared memory permits distinct +processes to potentially read and write to a common (or shared) region of +volatile memory. Processes are conventionally limited to only have access to +their own process memory space but shared memory permits the sharing +of data between processes, avoiding the need to instead send messages between +processes containing that data. Sharing data directly via memory can provide +significant performance benefits compared to sharing data via disk or socket +or other communications requiring the serialization/deserialization and +copying of data. + + +.. class:: SharedMemory(name=None, create=False, size=0) + + Creates a new shared memory block or attaches to an existing shared + memory block. Each shared memory block is assigned a unique name. + In this way, one process can create a shared memory block with a + particular name and a different process can attach to that same shared + memory block using that same name. + + As a resource for sharing data across processes, shared memory blocks + may outlive the original process that created them. When one process + no longer needs access to a shared memory block that might still be + needed by other processes, the :meth:`close()` method should be called. + When a shared memory block is no longer needed by any process, the + :meth:`unlink()` method should be called to ensure proper cleanup. + + *name* is the unique name for the requested shared memory, specified as + a string. When creating a new shared memory block, if ``None`` (the + default) is supplied for the name, a novel name will be generated. + + *create* controls whether a new shared memory block is created (``True``) + or an existing shared memory block is attached (``False``). + + *size* specifies the requested number of bytes when creating a new shared + memory block. Because some platforms choose to allocate chunks of memory + based upon that platform's memory page size, the exact size of the shared + memory block may be larger or equal to the size requested. When attaching + to an existing shared memory block, the ``size`` parameter is ignored. + + .. method:: close() + + Closes access to the shared memory from this instance. In order to + ensure proper cleanup of resources, all instances should call + ``close()`` once the instance is no longer needed. Note that calling + ``close()`` does not cause the shared memory block itself to be + destroyed. + + .. method:: unlink() + + Requests that the underlying shared memory block be destroyed. In + order to ensure proper cleanup of resources, ``unlink()`` should be + called once (and only once) across all processes which have need + for the shared memory block. After requesting its destruction, a + shared memory block may or may not be immediately destroyed and + this behavior may differ across platforms. Attempts to access data + inside the shared memory block after ``unlink()`` has been called may + result in memory access errors. Note: the last process relinquishing + its hold on a shared memory block may call ``unlink()`` and + :meth:`close()` in either order. + + .. attribute:: buf + + A memoryview of contents of the shared memory block. + + .. attribute:: name + + Read-only access to the unique name of the shared memory block. + + .. attribute:: size + + Read-only access to size in bytes of the shared memory block. + + +The following example demonstrates low-level use of :class:`SharedMemory` +instances:: + + >>> from multiprocessing import shared_memory + >>> shm_a = shared_memory.SharedMemory(create=True, size=10) + >>> type(shm_a.buf) + + >>> buffer = shm_a.buf + >>> len(buffer) + 10 + >>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once + >>> buffer[4] = 100 # Modify single byte at a time + >>> # Attach to an existing shared memory block + >>> shm_b = shared_memory.SharedMemory(shm_a.name) + >>> import array + >>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array + array('b', [22, 33, 44, 55, 100]) + >>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes + >>> bytes(shm_a.buf[:5]) # Access via shm_a + b'howdy' + >>> shm_b.close() # Close each SharedMemory instance + >>> shm_a.close() + >>> shm_a.unlink() # Call unlink only once to release the shared memory + + + +The following example demonstrates a practical use of the :class:`SharedMemory` +class with `NumPy arrays `_, accessing the +same ``numpy.ndarray`` from two distinct Python shells: + +.. doctest:: + :options: +SKIP + + >>> # In the first Python interactive shell + >>> import numpy as np + >>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array + >>> from multiprocessing import shared_memory + >>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes) + >>> # Now create a NumPy array backed by shared memory + >>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) + >>> b[:] = a[:] # Copy the original data into shared memory + >>> b + array([1, 1, 2, 3, 5, 8]) + >>> type(b) + + >>> type(a) + + >>> shm.name # We did not specify a name so one was chosen for us + 'psm_21467_46075' + + >>> # In either the same shell or a new Python shell on the same machine + >>> import numpy as np + >>> from multiprocessing import shared_memory + >>> # Attach to the existing shared memory block + >>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075') + >>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example + >>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf) + >>> c + array([1, 1, 2, 3, 5, 8]) + >>> c[-1] = 888 + >>> c + array([ 1, 1, 2, 3, 5, 888]) + + >>> # Back in the first Python interactive shell, b reflects this change + >>> b + array([ 1, 1, 2, 3, 5, 888]) + + >>> # Clean up from within the second Python shell + >>> del c # Unnecessary; merely emphasizing the array is no longer used + >>> existing_shm.close() + + >>> # Clean up from within the first Python shell + >>> del b # Unnecessary; merely emphasizing the array is no longer used + >>> shm.close() + >>> shm.unlink() # Free and release the shared memory block at the very end + + +.. class:: SharedMemoryManager([address[, authkey]]) + + A subclass of :class:`~multiprocessing.managers.BaseManager` which can be + used for the management of shared memory blocks across processes. + + A call to :meth:`~multiprocessing.managers.BaseManager.start` on a + :class:`SharedMemoryManager` instance causes a new process to be started. + This new process's sole purpose is to manage the life cycle + of all shared memory blocks created through it. To trigger the release + of all shared memory blocks managed by that process, call + :meth:`~multiprocessing.managers.BaseManager.shutdown()` on the instance. + This triggers a :meth:`SharedMemory.unlink()` call on all of the + :class:`SharedMemory` objects managed by that process and then + stops the process itself. By creating ``SharedMemory`` instances + through a ``SharedMemoryManager``, we avoid the need to manually track + and trigger the freeing of shared memory resources. + + This class provides methods for creating and returning :class:`SharedMemory` + instances and for creating a list-like object (:class:`ShareableList`) + backed by shared memory. + + Refer to :class:`multiprocessing.managers.BaseManager` for a description + of the inherited *address* and *authkey* optional input arguments and how + they may be used to connect to an existing ``SharedMemoryManager`` service + from other processes. + + .. method:: SharedMemory(size) + + Create and return a new :class:`SharedMemory` object with the + specified ``size`` in bytes. + + .. method:: ShareableList(sequence) + + Create and return a new :class:`ShareableList` object, initialized + by the values from the input ``sequence``. + + +The following example demonstrates the basic mechanisms of a +:class:`SharedMemoryManager`: + +.. doctest:: + :options: +SKIP + + >>> from multiprocessing import shared_memory + >>> smm = shared_memory.SharedMemoryManager() + >>> smm.start() # Start the process that manages the shared memory blocks + >>> sl = smm.ShareableList(range(4)) + >>> sl + ShareableList([0, 1, 2, 3], name='psm_6572_7512') + >>> raw_shm = smm.SharedMemory(size=128) + >>> another_sl = smm.ShareableList('alpha') + >>> another_sl + ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221') + >>> smm.shutdown() # Calls unlink() on sl, raw_shm, and another_sl + +The following example depicts a potentially more convenient pattern for using +:class:`SharedMemoryManager` objects via the :keyword:`with` statement to +ensure that all shared memory blocks are released after they are no longer +needed: + +.. doctest:: + :options: +SKIP + + >>> with shared_memory.SharedMemoryManager() as smm: + ... sl = smm.ShareableList(range(2000)) + ... # Divide the work among two processes, storing partial results in sl + ... p1 = Process(target=do_work, args=(sl, 0, 1000)) + ... p2 = Process(target=do_work, args=(sl, 1000, 2000)) + ... p1.start() + ... p2.start() # A multiprocessing.Pool might be more efficient + ... p1.join() + ... p2.join() # Wait for all work to complete in both processes + ... total_result = sum(sl) # Consolidate the partial results now in sl + +When using a :class:`SharedMemoryManager` in a :keyword:`with` statement, the +shared memory blocks created using that manager are all released when the +:keyword:`with` statement's code block finishes execution. + + +.. class:: ShareableList(sequence=None, *, name=None) + + Provides a mutable list-like object where all values stored within are + stored in a shared memory block. This constrains storable values to + only the ``int``, ``float``, ``bool``, ``str`` (less than 10M bytes each), + ``bytes`` (less than 10M bytes each), and ``None`` built-in data types. + It also notably differs from the built-in ``list`` type in that these + lists can not change their overall length (i.e. no append, insert, etc.) + and do not support the dynamic creation of new :class:`ShareableList` + instances via slicing. + + *sequence* is used in populating a new ``ShareableList`` full of values. + Set to ``None`` to instead attach to an already existing + ``ShareableList`` by its unique shared memory name. + + *name* is the unique name for the requested shared memory, as described + in the definition for :class:`SharedMemory`. When attaching to an + existing ``ShareableList``, specify its shared memory block's unique + name while leaving ``sequence`` set to ``None``. + + .. method:: count(value) + + Returns the number of occurrences of ``value``. + + .. method:: index(value) + + Returns first index position of ``value``. Raises :exc:`ValueError` if + ``value`` is not present. + + .. attribute:: format + + Read-only attribute containing the :mod:`struct` packing format used by + all currently stored values. + + .. attribute:: shm + + The :class:`SharedMemory` instance where the values are stored. + + +The following example demonstrates basic use of a :class:`ShareableList` +instance: + + >>> from multiprocessing import shared_memory + >>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42]) + >>> [ type(entry) for entry in a ] + [, , , , , , ] + >>> a[2] + -273.154 + >>> a[2] = -78.5 + >>> a[2] + -78.5 + >>> a[2] = 'dry ice' # Changing data types is supported as well + >>> a[2] + 'dry ice' + >>> a[2] = 'larger than previously allocated storage space' + Traceback (most recent call last): + ... + ValueError: exceeds available storage for existing str + >>> a[2] + 'dry ice' + >>> len(a) + 7 + >>> a.index(42) + 6 + >>> a.count(b'howdy') + 0 + >>> a.count(b'HoWdY') + 1 + >>> a.shm.close() + >>> a.shm.unlink() + >>> del a # Use of a ShareableList after call to unlink() is unsupported + +The following example depicts how one, two, or many processes may access the +same :class:`ShareableList` by supplying the name of the shared memory block +behind it: + + >>> b = shared_memory.ShareableList(range(5)) # In a first process + >>> c = shared_memory.ShareableList(name=b.shm.name) # In a second process + >>> c + ShareableList([0, 1, 2, 3, 4], name='...') + >>> c[-1] = -999 + >>> b[-1] + -999 + >>> b.shm.close() + >>> c.shm.close() + >>> c.shm.unlink() + diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 4ae8ddc7701..7973012b98d 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1,5 +1,5 @@ # -# Module providing the `SyncManager` class for dealing +# Module providing manager classes for dealing # with shared objects # # multiprocessing/managers.py @@ -8,7 +8,8 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] +__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token', + 'SharedMemoryManager' ] # # Imports @@ -19,6 +20,7 @@ import threading import array import queue import time +from os import getpid from traceback import format_exc @@ -28,6 +30,11 @@ from . import pool from . import process from . import util from . import get_context +try: + from . import shared_memory + HAS_SHMEM = True +except ImportError: + HAS_SHMEM = False # # Register some things for pickling @@ -1200,3 +1207,143 @@ SyncManager.register('Namespace', Namespace, NamespaceProxy) # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False) + +# +# Definition of SharedMemoryManager and SharedMemoryServer +# + +if HAS_SHMEM: + class _SharedMemoryTracker: + "Manages one or more shared memory segments." + + def __init__(self, name, segment_names=[]): + self.shared_memory_context_name = name + self.segment_names = segment_names + + def register_segment(self, segment_name): + "Adds the supplied shared memory block name to tracker." + util.debug(f"Register segment {segment_name!r} in pid {getpid()}") + self.segment_names.append(segment_name) + + def destroy_segment(self, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the list of blocks being tracked.""" + util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") + self.segment_names.remove(segment_name) + segment = shared_memory.SharedMemory(segment_name) + segment.close() + segment.unlink() + + def unlink(self): + "Calls destroy_segment() on all tracked shared memory blocks." + for segment_name in self.segment_names[:]: + self.destroy_segment(segment_name) + + def __del__(self): + util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") + self.unlink() + + def __getstate__(self): + return (self.shared_memory_context_name, self.segment_names) + + def __setstate__(self, state): + self.__init__(*state) + + + class SharedMemoryServer(Server): + + public = Server.public + \ + ['track_segment', 'release_segment', 'list_segments'] + + def __init__(self, *args, **kwargs): + Server.__init__(self, *args, **kwargs) + self.shared_memory_context = \ + _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}") + util.debug(f"SharedMemoryServer started by pid {getpid()}") + + def create(self, c, typeid, *args, **kwargs): + """Create a new distributed-shared object (not backed by a shared + memory block) and return its id to be used in a Proxy Object.""" + # Unless set up as a shared proxy, don't make shared_memory_context + # a standard part of kwargs. This makes things easier for supplying + # simple functions. + if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): + kwargs['shared_memory_context'] = self.shared_memory_context + return Server.create(self, c, typeid, *args, **kwargs) + + def shutdown(self, c): + "Call unlink() on all tracked shared memory, terminate the Server." + self.shared_memory_context.unlink() + return Server.shutdown(self, c) + + def track_segment(self, c, segment_name): + "Adds the supplied shared memory block name to Server's tracker." + self.shared_memory_context.register_segment(segment_name) + + def release_segment(self, c, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the tracker instance inside the Server.""" + self.shared_memory_context.destroy_segment(segment_name) + + def list_segments(self, c): + """Returns a list of names of shared memory blocks that the Server + is currently tracking.""" + return self.shared_memory_context.segment_names + + + class SharedMemoryManager(BaseManager): + """Like SyncManager but uses SharedMemoryServer instead of Server. + + It provides methods for creating and returning SharedMemory instances + and for creating a list-like object (ShareableList) backed by shared + memory. It also provides methods that create and return Proxy Objects + that support synchronization across processes (i.e. multi-process-safe + locks and semaphores). + """ + + _Server = SharedMemoryServer + + def __init__(self, *args, **kwargs): + BaseManager.__init__(self, *args, **kwargs) + util.debug(f"{self.__class__.__name__} created by pid {getpid()}") + + def __del__(self): + util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") + pass + + def get_server(self): + 'Better than monkeypatching for now; merge into Server ultimately' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started SharedMemoryServer") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("SharedMemoryManager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return self._Server(self._registry, self._address, + self._authkey, self._serializer) + + def SharedMemory(self, size): + """Returns a new SharedMemory instance with the specified size in + bytes, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sms = shared_memory.SharedMemory(None, create=True, size=size) + try: + dispatch(conn, None, 'track_segment', (sms.name,)) + except BaseException as e: + sms.unlink() + raise e + return sms + + def ShareableList(self, sequence): + """Returns a new ShareableList instance populated with the values + from the input sequence, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sl = shared_memory.ShareableList(sequence) + try: + dispatch(conn, None, 'track_segment', (sl.shm.name,)) + except BaseException as e: + sl.shm.unlink() + raise e + return sl diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index 11eac4bf0e3..e4fe822cc64 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -1,228 +1,234 @@ -"Provides shared memory for direct access across processes." +"""Provides shared memory for direct access across processes. + +The API of this package is currently provisional. Refer to the +documentation for details. +""" -__all__ = [ 'SharedMemory', 'PosixSharedMemory', 'WindowsNamedSharedMemory', - 'ShareableList', 'shareable_wrap', - 'SharedMemoryServer', 'SharedMemoryManager', 'SharedMemoryTracker' ] +__all__ = [ 'SharedMemory', 'ShareableList' ] -from functools import reduce +from functools import partial import mmap -from .managers import DictProxy, SyncManager, Server -from . import util import os -import random +import errno import struct -import sys -try: - from _posixshmem import _PosixSharedMemory, Error, ExistentialError, O_CREX -except ImportError as ie: - if os.name != "nt": - # On Windows, posixshmem is not required to be available. - raise ie - else: - _PosixSharedMemory = object - class ExistentialError(BaseException): pass - class Error(BaseException): pass - O_CREX = -1 +import secrets + +if os.name == "nt": + import _winapi + _USE_POSIX = False +else: + import _posixshmem + _USE_POSIX = True -class WindowsNamedSharedMemory: +_O_CREX = os.O_CREAT | os.O_EXCL - def __init__(self, name, flags=None, mode=None, size=None, read_only=False): - if name is None: - name = f'wnsm_{os.getpid()}_{random.randrange(100000)}' +# FreeBSD (and perhaps other BSDs) limit names to 14 characters. +_SHM_SAFE_NAME_LENGTH = 14 - self._mmap = mmap.mmap(-1, size, tagname=name) - self.buf = memoryview(self._mmap) - self.name = name - self.size = size - - def __repr__(self): - return f'{self.__class__.__name__}({self.name!r}, size={self.size})' - - def close(self): - self.buf.release() - self._mmap.close() - - def unlink(self): - """Windows ensures that destruction of the last reference to this - named shared memory block will result in the release of this memory.""" - pass +# Shared memory block name prefix +if _USE_POSIX: + _SHM_NAME_PREFIX = 'psm_' +else: + _SHM_NAME_PREFIX = 'wnsm_' -class PosixSharedMemory(_PosixSharedMemory): - - def __init__(self, name, flags=None, mode=None, size=None, read_only=False): - if name and (flags is None): - _PosixSharedMemory.__init__(self, name) - else: - if name is None: - name = f'psm_{os.getpid()}_{random.randrange(100000)}' - _PosixSharedMemory.__init__(self, name, flags=O_CREX, size=size) - - self._mmap = mmap.mmap(self.fd, self.size) - self.buf = memoryview(self._mmap) - - def __repr__(self): - return f'{self.__class__.__name__}({self.name!r}, size={self.size})' - - def close(self): - self.buf.release() - self._mmap.close() - self.close_fd() +def _make_filename(): + "Create a random filename for the shared memory object." + # number of random bytes to use for name + nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2 + assert nbytes >= 2, '_SHM_NAME_PREFIX too long' + name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes) + assert len(name) <= _SHM_SAFE_NAME_LENGTH + return name class SharedMemory: + """Creates a new shared memory block or attaches to an existing + shared memory block. - def __new__(cls, *args, **kwargs): - if os.name == 'nt': - cls = WindowsNamedSharedMemory - else: - cls = PosixSharedMemory - return cls(*args, **kwargs) + Every shared memory block is assigned a unique name. This enables + one process to create a shared memory block with a particular name + so that a different process can attach to that same shared memory + block using that same name. + As a resource for sharing data across processes, shared memory blocks + may outlive the original process that created them. When one process + no longer needs access to a shared memory block that might still be + needed by other processes, the close() method should be called. + When a shared memory block is no longer needed by any process, the + unlink() method should be called to ensure proper cleanup.""" -def shareable_wrap( - existing_obj=None, - shmem_name=None, - cls=None, - shape=(0,), - strides=None, - dtype=None, - format=None, - **kwargs -): - augmented_kwargs = dict(kwargs) - extras = dict(shape=shape, strides=strides, dtype=dtype, format=format) - for key, value in extras.items(): - if value is not None: - augmented_kwargs[key] = value + # Defaults; enables close() and unlink() to run without errors. + _name = None + _fd = -1 + _mmap = None + _buf = None + _flags = os.O_RDWR + _mode = 0o600 - if existing_obj is not None: - existing_type = getattr( - existing_obj, - "_proxied_type", - type(existing_obj) - ) + def __init__(self, name=None, create=False, size=0): + if not size >= 0: + raise ValueError("'size' must be a positive integer") + if create: + self._flags = _O_CREX | os.O_RDWR + if name is None and not self._flags & os.O_EXCL: + raise ValueError("'name' can only be None if create=True") - #agg = existing_obj.itemsize - #size = [ agg := i * agg for i in existing_obj.shape ][-1] - # TODO: replace use of reduce below with above 2 lines once available - size = reduce( - lambda x, y: x * y, - existing_obj.shape, - existing_obj.itemsize - ) + if _USE_POSIX: - else: - assert shmem_name is not None - existing_type = cls - size = 1 + # POSIX Shared Memory - shm = SharedMemory(shmem_name, size=size) - - class CustomShareableProxy(existing_type): - - def __init__(self, *args, buffer=None, **kwargs): - # If copy method called, prevent recursion from replacing _shm. - if not hasattr(self, "_shm"): - self._shm = shm - self._proxied_type = existing_type + if name is None: + while True: + name = _make_filename() + try: + self._fd = _posixshmem.shm_open( + name, + self._flags, + mode=self._mode + ) + except FileExistsError: + continue + self._name = name + break else: - # _proxied_type only used in pickling. - assert hasattr(self, "_proxied_type") + self._fd = _posixshmem.shm_open( + name, + self._flags, + mode=self._mode + ) + self._name = name try: - existing_type.__init__(self, *args, **kwargs) - except: - pass + if create and size: + os.ftruncate(self._fd, size) + stats = os.fstat(self._fd) + size = stats.st_size + self._mmap = mmap.mmap(self._fd, size) + except OSError: + self.unlink() + raise - def __repr__(self): - if not hasattr(self, "_shm"): - return existing_type.__repr__(self) - formatted_pairs = ( - "%s=%r" % kv for kv in self._build_state(self).items() - ) - return f"{self.__class__.__name__}({', '.join(formatted_pairs)})" + else: - #def __getstate__(self): - # if not hasattr(self, "_shm"): - # return existing_type.__getstate__(self) - # state = self._build_state(self) - # return state + # Windows Named Shared Memory - #def __setstate__(self, state): - # self.__init__(**state) + if create: + while True: + temp_name = _make_filename() if name is None else name + # Create and reserve shared memory block with this name + # until it can be attached to by mmap. + h_map = _winapi.CreateFileMapping( + _winapi.INVALID_HANDLE_VALUE, + _winapi.NULL, + _winapi.PAGE_READWRITE, + (size >> 32) & 0xFFFFFFFF, + size & 0xFFFFFFFF, + temp_name + ) + try: + last_error_code = _winapi.GetLastError() + if last_error_code == _winapi.ERROR_ALREADY_EXISTS: + if name is not None: + raise FileExistsError( + errno.EEXIST, + os.strerror(errno.EEXIST), + name, + _winapi.ERROR_ALREADY_EXISTS + ) + else: + continue + self._mmap = mmap.mmap(-1, size, tagname=temp_name) + finally: + _winapi.CloseHandle(h_map) + self._name = temp_name + break - def __reduce__(self): - return ( - shareable_wrap, - ( - None, - self._shm.name, - self._proxied_type, - self.shape, - self.strides, - self.dtype.str if hasattr(self, "dtype") else None, - getattr(self, "format", None), - ), - ) - - def copy(self): - dupe = existing_type.copy(self) - if not hasattr(dupe, "_shm"): - dupe = shareable_wrap(dupe) - return dupe - - @staticmethod - def _build_state(existing_obj, generics_only=False): - state = { - "shape": existing_obj.shape, - "strides": existing_obj.strides, - } - try: - state["dtype"] = existing_obj.dtype - except AttributeError: + else: + self._name = name + # Dynamically determine the existing named shared memory + # block's size which is likely a multiple of mmap.PAGESIZE. + h_map = _winapi.OpenFileMapping( + _winapi.FILE_MAP_READ, + False, + name + ) try: - state["format"] = existing_obj.format - except AttributeError: - pass - if not generics_only: - try: - state["shmem_name"] = existing_obj._shm.name - state["cls"] = existing_type - except AttributeError: - pass - return state + p_buf = _winapi.MapViewOfFile( + h_map, + _winapi.FILE_MAP_READ, + 0, + 0, + 0 + ) + finally: + _winapi.CloseHandle(h_map) + size = _winapi.VirtualQuerySize(p_buf) + self._mmap = mmap.mmap(-1, size, tagname=name) - proxy_type = type( - f"{existing_type.__name__}Shareable", - CustomShareableProxy.__bases__, - dict(CustomShareableProxy.__dict__), - ) + self._size = size + self._buf = memoryview(self._mmap) - if existing_obj is not None: + def __del__(self): try: - proxy_obj = proxy_type( - buffer=shm.buf, - **proxy_type._build_state(existing_obj) - ) - except Exception: - proxy_obj = proxy_type( - buffer=shm.buf, - **proxy_type._build_state(existing_obj, True) - ) + self.close() + except OSError: + pass - mveo = memoryview(existing_obj) - proxy_obj._shm.buf[:mveo.nbytes] = mveo.tobytes() + def __reduce__(self): + return ( + self.__class__, + ( + self.name, + False, + self.size, + ), + ) - else: - proxy_obj = proxy_type(buffer=shm.buf, **augmented_kwargs) + def __repr__(self): + return f'{self.__class__.__name__}({self.name!r}, size={self.size})' - return proxy_obj + @property + def buf(self): + "A memoryview of contents of the shared memory block." + return self._buf + + @property + def name(self): + "Unique name that identifies the shared memory block." + return self._name + + @property + def size(self): + "Size in bytes." + return self._size + + def close(self): + """Closes access to the shared memory from this instance but does + not destroy the shared memory block.""" + if self._buf is not None: + self._buf.release() + self._buf = None + if self._mmap is not None: + self._mmap.close() + self._mmap = None + if _USE_POSIX and self._fd >= 0: + os.close(self._fd) + self._fd = -1 + + def unlink(self): + """Requests that the underlying shared memory block be destroyed. + + In order to ensure proper cleanup of resources, unlink should be + called once (and only once) across all processes which have access + to the shared memory block.""" + if _USE_POSIX and self.name: + _posixshmem.shm_unlink(self.name) -encoding = "utf8" +_encoding = "utf8" class ShareableList: """Pattern for a mutable list-like object shareable via a shared @@ -234,8 +240,7 @@ class ShareableList: packing format for any storable value must require no more than 8 characters to describe its format.""" - # TODO: Adjust for discovered word size of machine. - types_mapping = { + _types_mapping = { int: "q", float: "d", bool: "xxxxxxx?", @@ -243,17 +248,17 @@ class ShareableList: bytes: "%ds", None.__class__: "xxxxxx?x", } - alignment = 8 - back_transform_codes = { + _alignment = 8 + _back_transforms_mapping = { 0: lambda value: value, # int, float, bool - 1: lambda value: value.rstrip(b'\x00').decode(encoding), # str + 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str 2: lambda value: value.rstrip(b'\x00'), # bytes 3: lambda _value: None, # None } @staticmethod def _extract_recreation_code(value): - """Used in concert with back_transform_codes to convert values + """Used in concert with _back_transforms_mapping to convert values into the appropriate Python objects when retrieving them from the list as well as when storing them.""" if not isinstance(value, (str, bytes, None.__class__)): @@ -265,36 +270,42 @@ class ShareableList: else: return 3 # NoneType - def __init__(self, iterable=None, name=None): - if iterable is not None: + def __init__(self, sequence=None, *, name=None): + if sequence is not None: _formats = [ - self.types_mapping[type(item)] + self._types_mapping[type(item)] if not isinstance(item, (str, bytes)) - else self.types_mapping[type(item)] % ( - self.alignment * (len(item) // self.alignment + 1), + else self._types_mapping[type(item)] % ( + self._alignment * (len(item) // self._alignment + 1), ) - for item in iterable + for item in sequence ] self._list_len = len(_formats) assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len self._allocated_bytes = tuple( - self.alignment if fmt[-1] != "s" else int(fmt[:-1]) + self._alignment if fmt[-1] != "s" else int(fmt[:-1]) for fmt in _formats ) - _back_transform_codes = [ - self._extract_recreation_code(item) for item in iterable + _recreation_codes = [ + self._extract_recreation_code(item) for item in sequence ] requested_size = struct.calcsize( - "q" + self._format_size_metainfo + "".join(_formats) + "q" + self._format_size_metainfo + + "".join(_formats) + + self._format_packing_metainfo + + self._format_back_transform_codes ) else: - requested_size = 1 # Some platforms require > 0. + requested_size = 8 # Some platforms require > 0. - self.shm = SharedMemory(name, size=requested_size) + if name is not None and sequence is None: + self.shm = SharedMemory(name) + else: + self.shm = SharedMemory(name, create=True, size=requested_size) - if iterable is not None: - _enc = encoding + if sequence is not None: + _enc = _encoding struct.pack_into( "q" + self._format_size_metainfo, self.shm.buf, @@ -306,7 +317,7 @@ class ShareableList: "".join(_formats), self.shm.buf, self._offset_data_start, - *(v.encode(_enc) if isinstance(v, str) else v for v in iterable) + *(v.encode(_enc) if isinstance(v, str) else v for v in sequence) ) struct.pack_into( self._format_packing_metainfo, @@ -318,7 +329,7 @@ class ShareableList: self._format_back_transform_codes, self.shm.buf, self._offset_back_transform_codes, - *(_back_transform_codes) + *(_recreation_codes) ) else: @@ -341,7 +352,7 @@ class ShareableList: self._offset_packing_formats + position * 8 )[0] fmt = v.rstrip(b'\x00') - fmt_as_str = fmt.decode(encoding) + fmt_as_str = fmt.decode(_encoding) return fmt_as_str @@ -357,7 +368,7 @@ class ShareableList: self.shm.buf, self._offset_back_transform_codes + position )[0] - transform_function = self.back_transform_codes[transform_code] + transform_function = self._back_transforms_mapping[transform_code] return transform_function @@ -373,7 +384,7 @@ class ShareableList: "8s", self.shm.buf, self._offset_packing_formats + position * 8, - fmt_as_str.encode(encoding) + fmt_as_str.encode(_encoding) ) transform_code = self._extract_recreation_code(value) @@ -410,14 +421,14 @@ class ShareableList: raise IndexError("assignment index out of range") if not isinstance(value, (str, bytes)): - new_format = self.types_mapping[type(value)] + new_format = self._types_mapping[type(value)] else: if len(value) > self._allocated_bytes[position]: raise ValueError("exceeds available storage for existing str") if current_format[-1] == "s": new_format = current_format else: - new_format = self.types_mapping[str] % ( + new_format = self._types_mapping[str] % ( self._allocated_bytes[position], ) @@ -426,16 +437,24 @@ class ShareableList: new_format, value ) - value = value.encode(encoding) if isinstance(value, str) else value + value = value.encode(_encoding) if isinstance(value, str) else value struct.pack_into(new_format, self.shm.buf, offset, value) + def __reduce__(self): + return partial(self.__class__, name=self.shm.name), () + def __len__(self): return struct.unpack_from("q", self.shm.buf, 0)[0] + def __repr__(self): + return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})' + @property def format(self): "The struct packing format used by all currently stored values." - return "".join(self._get_packing_format(i) for i in range(self._list_len)) + return "".join( + self._get_packing_format(i) for i in range(self._list_len) + ) @property def _format_size_metainfo(self): @@ -464,12 +483,6 @@ class ShareableList: def _offset_back_transform_codes(self): return self._offset_packing_formats + self._list_len * 8 - @classmethod - def copy(cls, self): - "L.copy() -> ShareableList -- a shallow copy of L." - - return cls(self) - def count(self, value): "L.count(value) -> integer -- return number of occurrences of value." @@ -484,90 +497,3 @@ class ShareableList: return position else: raise ValueError(f"{value!r} not in this container") - - -class SharedMemoryTracker: - "Manages one or more shared memory segments." - - def __init__(self, name, segment_names=[]): - self.shared_memory_context_name = name - self.segment_names = segment_names - - def register_segment(self, segment): - util.debug(f"Registering segment {segment.name!r} in pid {os.getpid()}") - self.segment_names.append(segment.name) - - def destroy_segment(self, segment_name): - util.debug(f"Destroying segment {segment_name!r} in pid {os.getpid()}") - self.segment_names.remove(segment_name) - segment = SharedMemory(segment_name, size=1) - segment.close() - segment.unlink() - - def unlink(self): - for segment_name in self.segment_names[:]: - self.destroy_segment(segment_name) - - def __del__(self): - util.debug(f"Called {self.__class__.__name__}.__del__ in {os.getpid()}") - self.unlink() - - def __getstate__(self): - return (self.shared_memory_context_name, self.segment_names) - - def __setstate__(self, state): - self.__init__(*state) - - def wrap(self, obj_exposing_buffer_protocol): - wrapped_obj = shareable_wrap(obj_exposing_buffer_protocol) - self.register_segment(wrapped_obj._shm) - return wrapped_obj - - -class SharedMemoryServer(Server): - def __init__(self, *args, **kwargs): - Server.__init__(self, *args, **kwargs) - self.shared_memory_context = \ - SharedMemoryTracker(f"shmm_{self.address}_{os.getpid()}") - util.debug(f"SharedMemoryServer started by pid {os.getpid()}") - - def create(self, c, typeid, *args, **kwargs): - # Unless set up as a shared proxy, don't make shared_memory_context - # a standard part of kwargs. This makes things easier for supplying - # simple functions. - if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): - kwargs['shared_memory_context'] = self.shared_memory_context - return Server.create(self, c, typeid, *args, **kwargs) - - def shutdown(self, c): - self.shared_memory_context.unlink() - return Server.shutdown(self, c) - - -class SharedMemoryManager(SyncManager): - """Like SyncManager but uses SharedMemoryServer instead of Server. - - TODO: Consider relocate/merge into managers submodule.""" - - _Server = SharedMemoryServer - - def __init__(self, *args, **kwargs): - SyncManager.__init__(self, *args, **kwargs) - util.debug(f"{self.__class__.__name__} created by pid {os.getpid()}") - - def __del__(self): - util.debug(f"{self.__class__.__name__} told die by pid {os.getpid()}") - pass - - def get_server(self): - 'Better than monkeypatching for now; merge into Server ultimately' - if self._state.value != State.INITIAL: - if self._state.value == State.STARTED: - raise ProcessError("Already started server") - elif self._state.value == State.SHUTDOWN: - raise ProcessError("Manager has shut down") - else: - raise ProcessError( - "Unknown state {!r}".format(self._state.value)) - return _Server(self._registry, self._address, - self._authkey, self._serializer) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 81db2c98705..a860d9db44f 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -19,6 +19,7 @@ import random import logging import struct import operator +import pickle import weakref import warnings import test.support @@ -53,6 +54,12 @@ try: except ImportError: HAS_SHAREDCTYPES = False +try: + from multiprocessing import shared_memory + HAS_SHMEM = True +except ImportError: + HAS_SHMEM = False + try: import msvcrt except ImportError: @@ -3610,6 +3617,263 @@ class _TestSharedCTypes(BaseTestCase): self.assertAlmostEqual(bar.y, 5.0) self.assertEqual(bar.z, 2 ** 33) + +@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") +class _TestSharedMemory(BaseTestCase): + + ALLOWED_TYPES = ('processes',) + + @staticmethod + def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): + if isinstance(shmem_name_or_obj, str): + local_sms = shared_memory.SharedMemory(shmem_name_or_obj) + else: + local_sms = shmem_name_or_obj + local_sms.buf[:len(binary_data)] = binary_data + local_sms.close() + + def test_shared_memory_basics(self): + sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512) + self.addCleanup(sms.unlink) + + # Verify attributes are readable. + self.assertEqual(sms.name, 'test01_tsmb') + self.assertGreaterEqual(sms.size, 512) + self.assertGreaterEqual(len(sms.buf), sms.size) + + # Modify contents of shared memory segment through memoryview. + sms.buf[0] = 42 + self.assertEqual(sms.buf[0], 42) + + # Attach to existing shared memory segment. + also_sms = shared_memory.SharedMemory('test01_tsmb') + self.assertEqual(also_sms.buf[0], 42) + also_sms.close() + + # Attach to existing shared memory segment but specify a new size. + same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size) + self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. + same_sms.close() + + if shared_memory._USE_POSIX: + # Posix Shared Memory can only be unlinked once. Here we + # test an implementation detail that is not observed across + # all supported platforms (since WindowsNamedSharedMemory + # manages unlinking on its own and unlink() does nothing). + # True release of shared memory segment does not necessarily + # happen until process exits, depending on the OS platform. + with self.assertRaises(FileNotFoundError): + sms_uno = shared_memory.SharedMemory( + 'test01_dblunlink', + create=True, + size=5000 + ) + + try: + self.assertGreaterEqual(sms_uno.size, 5000) + + sms_duo = shared_memory.SharedMemory('test01_dblunlink') + sms_duo.unlink() # First shm_unlink() call. + sms_duo.close() + sms_uno.close() + + finally: + sms_uno.unlink() # A second shm_unlink() call is bad. + + with self.assertRaises(FileExistsError): + # Attempting to create a new shared memory segment with a + # name that is already in use triggers an exception. + there_can_only_be_one_sms = shared_memory.SharedMemory( + 'test01_tsmb', + create=True, + size=512 + ) + + if shared_memory._USE_POSIX: + # Requesting creation of a shared memory segment with the option + # to attach to an existing segment, if that name is currently in + # use, should not trigger an exception. + # Note: Using a smaller size could possibly cause truncation of + # the existing segment but is OS platform dependent. In the + # case of MacOS/darwin, requesting a smaller size is disallowed. + class OptionalAttachSharedMemory(shared_memory.SharedMemory): + _flags = os.O_CREAT | os.O_RDWR + ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb') + self.assertEqual(ok_if_exists_sms.size, sms.size) + ok_if_exists_sms.close() + + # Attempting to attach to an existing shared memory segment when + # no segment exists with the supplied name triggers an exception. + with self.assertRaises(FileNotFoundError): + nonexisting_sms = shared_memory.SharedMemory('test01_notthere') + nonexisting_sms.unlink() # Error should occur on prior line. + + sms.close() + + def test_shared_memory_across_processes(self): + sms = shared_memory.SharedMemory('test02_tsmap', True, size=512) + self.addCleanup(sms.unlink) + + # Verify remote attachment to existing block by name is working. + p = self.Process( + target=self._attach_existing_shmem_then_write, + args=(sms.name, b'howdy') + ) + p.daemon = True + p.start() + p.join() + self.assertEqual(bytes(sms.buf[:5]), b'howdy') + + # Verify pickling of SharedMemory instance also works. + p = self.Process( + target=self._attach_existing_shmem_then_write, + args=(sms, b'HELLO') + ) + p.daemon = True + p.start() + p.join() + self.assertEqual(bytes(sms.buf[:5]), b'HELLO') + + sms.close() + + def test_shared_memory_SharedMemoryManager_basics(self): + smm1 = multiprocessing.managers.SharedMemoryManager() + with self.assertRaises(ValueError): + smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started + smm1.start() + lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] + lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] + doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) + self.assertEqual(len(doppleganger_list0), 5) + doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) + self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) + held_name = lom[0].name + smm1.shutdown() + if sys.platform != "win32": + # Calls to unlink() have no effect on Windows platform; shared + # memory will only be released once final process exits. + with self.assertRaises(FileNotFoundError): + # No longer there to be attached to again. + absent_shm = shared_memory.SharedMemory(name=held_name) + + with multiprocessing.managers.SharedMemoryManager() as smm2: + sl = smm2.ShareableList("howdy") + shm = smm2.SharedMemory(size=128) + held_name = sl.shm.name + if sys.platform != "win32": + with self.assertRaises(FileNotFoundError): + # No longer there to be attached to again. + absent_sl = shared_memory.ShareableList(name=held_name) + + + def test_shared_memory_ShareableList_basics(self): + sl = shared_memory.ShareableList( + ['howdy', b'HoWdY', -273.154, 100, None, True, 42] + ) + self.addCleanup(sl.shm.unlink) + + # Verify attributes are readable. + self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') + + # Exercise len(). + self.assertEqual(len(sl), 7) + + # Exercise index(). + with warnings.catch_warnings(): + # Suppress BytesWarning when comparing against b'HoWdY'. + warnings.simplefilter('ignore') + with self.assertRaises(ValueError): + sl.index('100') + self.assertEqual(sl.index(100), 3) + + # Exercise retrieving individual values. + self.assertEqual(sl[0], 'howdy') + self.assertEqual(sl[-2], True) + + # Exercise iterability. + self.assertEqual( + tuple(sl), + ('howdy', b'HoWdY', -273.154, 100, None, True, 42) + ) + + # Exercise modifying individual values. + sl[3] = 42 + self.assertEqual(sl[3], 42) + sl[4] = 'some' # Change type at a given position. + self.assertEqual(sl[4], 'some') + self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') + with self.assertRaises(ValueError): + sl[4] = 'far too many' # Exceeds available storage. + self.assertEqual(sl[4], 'some') + + # Exercise count(). + with warnings.catch_warnings(): + # Suppress BytesWarning when comparing against b'HoWdY'. + warnings.simplefilter('ignore') + self.assertEqual(sl.count(42), 2) + self.assertEqual(sl.count(b'HoWdY'), 1) + self.assertEqual(sl.count(b'adios'), 0) + + # Exercise creating a duplicate. + sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate') + try: + self.assertNotEqual(sl.shm.name, sl_copy.shm.name) + self.assertEqual('test03_duplicate', sl_copy.shm.name) + self.assertEqual(list(sl), list(sl_copy)) + self.assertEqual(sl.format, sl_copy.format) + sl_copy[-1] = 77 + self.assertEqual(sl_copy[-1], 77) + self.assertNotEqual(sl[-1], 77) + sl_copy.shm.close() + finally: + sl_copy.shm.unlink() + + # Obtain a second handle on the same ShareableList. + sl_tethered = shared_memory.ShareableList(name=sl.shm.name) + self.assertEqual(sl.shm.name, sl_tethered.shm.name) + sl_tethered[-1] = 880 + self.assertEqual(sl[-1], 880) + sl_tethered.shm.close() + + sl.shm.close() + + # Exercise creating an empty ShareableList. + empty_sl = shared_memory.ShareableList() + try: + self.assertEqual(len(empty_sl), 0) + self.assertEqual(empty_sl.format, '') + self.assertEqual(empty_sl.count('any'), 0) + with self.assertRaises(ValueError): + empty_sl.index(None) + empty_sl.shm.close() + finally: + empty_sl.shm.unlink() + + def test_shared_memory_ShareableList_pickling(self): + sl = shared_memory.ShareableList(range(10)) + self.addCleanup(sl.shm.unlink) + + serialized_sl = pickle.dumps(sl) + deserialized_sl = pickle.loads(serialized_sl) + self.assertTrue( + isinstance(deserialized_sl, shared_memory.ShareableList) + ) + self.assertTrue(deserialized_sl[-1], 9) + self.assertFalse(sl is deserialized_sl) + deserialized_sl[4] = "changed" + self.assertEqual(sl[4], "changed") + + # Verify data is not being put into the pickled representation. + name = 'a' * len(sl.shm.name) + larger_sl = shared_memory.ShareableList(range(400)) + self.addCleanup(larger_sl.shm.unlink) + serialized_larger_sl = pickle.dumps(larger_sl) + self.assertTrue(len(serialized_sl) == len(serialized_larger_sl)) + larger_sl.shm.close() + + deserialized_sl.shm.close() + sl.shm.close() + # # # @@ -4779,27 +5043,6 @@ class TestSyncManagerTypes(unittest.TestCase): self.wait_proc_exit() self.assertEqual(self.proc.exitcode, 0) - @classmethod - def _test_queue(cls, obj): - assert obj.qsize() == 2 - assert obj.full() - assert not obj.empty() - assert obj.get() == 5 - assert not obj.empty() - assert obj.get() == 6 - assert obj.empty() - - def test_queue(self, qname="Queue"): - o = getattr(self.manager, qname)(2) - o.put(5) - o.put(6) - self.run_worker(self._test_queue, o) - assert o.empty() - assert not o.full() - - def test_joinable_queue(self): - self.test_queue("JoinableQueue") - @classmethod def _test_event(cls, obj): assert obj.is_set() @@ -4873,6 +5116,27 @@ class TestSyncManagerTypes(unittest.TestCase): o = self.manager.Pool(processes=4) self.run_worker(self._test_pool, o) + @classmethod + def _test_queue(cls, obj): + assert obj.qsize() == 2 + assert obj.full() + assert not obj.empty() + assert obj.get() == 5 + assert not obj.empty() + assert obj.get() == 6 + assert obj.empty() + + def test_queue(self, qname="Queue"): + o = getattr(self.manager, qname)(2) + o.put(5) + o.put(6) + self.run_worker(self._test_queue, o) + assert o.empty() + assert not o.full() + + def test_joinable_queue(self): + self.test_queue("JoinableQueue") + @classmethod def _test_list(cls, obj): assert obj[0] == 5 @@ -4945,18 +5209,6 @@ class TestSyncManagerTypes(unittest.TestCase): self.run_worker(self._test_namespace, o) -try: - import multiprocessing.shared_memory -except ImportError: - @unittest.skip("SharedMemoryManager not available on this platform") - class TestSharedMemoryManagerTypes(TestSyncManagerTypes): - pass -else: - class TestSharedMemoryManagerTypes(TestSyncManagerTypes): - """Same as above but by using SharedMemoryManager.""" - manager_class = multiprocessing.shared_memory.SharedMemoryManager - - class MiscTestCase(unittest.TestCase): def test__all__(self): # Just make sure names in blacklist are excluded diff --git a/Modules/_multiprocessing/clinic/posixshmem.c.h b/Modules/_multiprocessing/clinic/posixshmem.c.h new file mode 100644 index 00000000000..20abddc0a2e --- /dev/null +++ b/Modules/_multiprocessing/clinic/posixshmem.c.h @@ -0,0 +1,92 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +#if defined(HAVE_SHM_OPEN) + +PyDoc_STRVAR(_posixshmem_shm_open__doc__, +"shm_open($module, /, path, flags, mode=511)\n" +"--\n" +"\n" +"Open a shared memory object. Returns a file descriptor (integer)."); + +#define _POSIXSHMEM_SHM_OPEN_METHODDEF \ + {"shm_open", (PyCFunction)(void(*)(void))_posixshmem_shm_open, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_open__doc__}, + +static int +_posixshmem_shm_open_impl(PyObject *module, PyObject *path, int flags, + int mode); + +static PyObject * +_posixshmem_shm_open(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"path", "flags", "mode", NULL}; + static _PyArg_Parser _parser = {"Ui|i:shm_open", _keywords, 0}; + PyObject *path; + int flags; + int mode = 511; + int _return_value; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &path, &flags, &mode)) { + goto exit; + } + _return_value = _posixshmem_shm_open_impl(module, path, flags, mode); + if ((_return_value == -1) && PyErr_Occurred()) { + goto exit; + } + return_value = PyLong_FromLong((long)_return_value); + +exit: + return return_value; +} + +#endif /* defined(HAVE_SHM_OPEN) */ + +#if defined(HAVE_SHM_UNLINK) + +PyDoc_STRVAR(_posixshmem_shm_unlink__doc__, +"shm_unlink($module, /, path)\n" +"--\n" +"\n" +"Remove a shared memory object (similar to unlink()).\n" +"\n" +"Remove a shared memory object name, and, once all processes have unmapped\n" +"the object, de-allocates and destroys the contents of the associated memory\n" +"region."); + +#define _POSIXSHMEM_SHM_UNLINK_METHODDEF \ + {"shm_unlink", (PyCFunction)(void(*)(void))_posixshmem_shm_unlink, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_unlink__doc__}, + +static PyObject * +_posixshmem_shm_unlink_impl(PyObject *module, PyObject *path); + +static PyObject * +_posixshmem_shm_unlink(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"path", NULL}; + static _PyArg_Parser _parser = {"U:shm_unlink", _keywords, 0}; + PyObject *path; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &path)) { + goto exit; + } + return_value = _posixshmem_shm_unlink_impl(module, path); + +exit: + return return_value; +} + +#endif /* defined(HAVE_SHM_UNLINK) */ + +#ifndef _POSIXSHMEM_SHM_OPEN_METHODDEF + #define _POSIXSHMEM_SHM_OPEN_METHODDEF +#endif /* !defined(_POSIXSHMEM_SHM_OPEN_METHODDEF) */ + +#ifndef _POSIXSHMEM_SHM_UNLINK_METHODDEF + #define _POSIXSHMEM_SHM_UNLINK_METHODDEF +#endif /* !defined(_POSIXSHMEM_SHM_UNLINK_METHODDEF) */ +/*[clinic end generated code: output=ff9cf0bc9b8baddf input=a9049054013a1b77]*/ diff --git a/Modules/_multiprocessing/posixshmem.c b/Modules/_multiprocessing/posixshmem.c index 7dd29f405e4..2049dbbc6fa 100644 --- a/Modules/_multiprocessing/posixshmem.c +++ b/Modules/_multiprocessing/posixshmem.c @@ -1,31 +1,5 @@ /* -posixshmem - A Python module for accessing POSIX 1003.1b-1993 shared memory. - -Copyright (c) 2012, Philip Semanchuk -Copyright (c) 2018, 2019, Davin Potts -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - * Neither the name of posixshmem nor the names of its contributors may - be used to endorse or promote products derived from this software - without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY ITS CONTRIBUTORS ''AS IS'' AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL Philip Semanchuk BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +posixshmem - A Python extension that provides shm_open() and shm_unlink() */ #define PY_SSIZE_T_CLEAN @@ -33,603 +7,15 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include "structmember.h" -#include -#include -#include -#include -#include - -// For shared memory stuff -#include +// for shm_open() and shm_unlink() +#ifdef HAVE_SYS_MMAN_H #include - -/* SEM_FAILED is defined as an int in Apple's headers, and this makes the -compiler complain when I compare it to a pointer. Python faced the same -problem (issue 9586) and I copied their solution here. -ref: http://bugs.python.org/issue9586 - -Note that in /Developer/SDKs/MacOSX10.4u.sdk/usr/include/sys/semaphore.h, -SEM_FAILED is #defined as -1 and that's apparently the definition used by -Python when building. In /usr/include/sys/semaphore.h, it's defined -as ((sem_t *)-1). -*/ -#ifdef __APPLE__ - #undef SEM_FAILED - #define SEM_FAILED ((sem_t *)-1) #endif -/* POSIX says that a mode_t "shall be an integer type". To avoid the need -for a specific get_mode function for each type, I'll just stuff the mode into -a long and mention it in the Xxx_members list for each type. -ref: http://www.opengroup.org/onlinepubs/000095399/basedefs/sys/types.h.html -*/ - -typedef struct { - PyObject_HEAD - char *name; - long mode; - int fd; -} SharedMemory; - - -// FreeBSD (and perhaps other BSDs) limit names to 14 characters. In the -// code below, strings of this length are allocated on the stack, so -// increase this gently or change that code to use malloc(). -#define MAX_SAFE_NAME_LENGTH 14 - - -/* Struct to contain an IPC object name which can be None */ -typedef struct { - int is_none; - char *name; -} NoneableName; - - -/* - Exceptions for this module -*/ - -static PyObject *pBaseException; -static PyObject *pPermissionsException; -static PyObject *pExistentialException; - - -#ifdef POSIX_IPC_DEBUG -#define DPRINTF(fmt, args...) fprintf(stderr, "+++ " fmt, ## args) -#else -#define DPRINTF(fmt, args...) -#endif - -static char * -bytes_to_c_string(PyObject* o, int lock) { -/* Convert a bytes object to a char *. Optionally lock the buffer if it is a - bytes array. - This code swiped directly from Python 3.1's posixmodule.c by Philip S. - The name there is bytes2str(). -*/ - if (PyBytes_Check(o)) - return PyBytes_AsString(o); - else if (PyByteArray_Check(o)) { - if (lock && PyObject_GetBuffer(o, NULL, 0) < 0) - /* On a bytearray, this should not fail. */ - PyErr_BadInternalCall(); - return PyByteArray_AsString(o); - } else { - /* The FS converter should have verified that this - is either bytes or bytearray. */ - Py_FatalError("bad object passed to bytes2str"); - /* not reached. */ - return ""; - } -} - -static void -release_bytes(PyObject* o) - /* Release the lock, decref the object. - This code swiped directly from Python 3.1's posixmodule.c by Philip S. - */ -{ - if (PyByteArray_Check(o)) - o->ob_type->tp_as_buffer->bf_releasebuffer(NULL, 0); - Py_DECREF(o); -} - - -static int -random_in_range(int min, int max) { - // returns a random int N such that min <= N <= max - int diff = (max - min) + 1; - - // ref: http://www.c-faq.com/lib/randrange.html - return ((int)((double)rand() / ((double)RAND_MAX + 1) * diff)) + min; -} - - -static -int create_random_name(char *name) { - // The random name is always lowercase so that this code will work - // on case-insensitive file systems. It always starts with a forward - // slash. - int length; - char *alphabet = "abcdefghijklmnopqrstuvwxyz"; - int i; - - // Generate a random length for the name. I subtract 1 from the - // MAX_SAFE_NAME_LENGTH in order to allow for the name's leading "/". - length = random_in_range(6, MAX_SAFE_NAME_LENGTH - 1); - - name[0] = '/'; - name[length] = '\0'; - i = length; - while (--i) - name[i] = alphabet[random_in_range(0, 25)]; - - return length; -} - - -static int -convert_name_param(PyObject *py_name_param, void *checked_name) { - /* Verifies that the py_name_param is either None or a string. - If it's a string, checked_name->name points to a PyMalloc-ed buffer - holding a NULL-terminated C version of the string when this function - concludes. The caller is responsible for releasing the buffer. - */ - int rc = 0; - NoneableName *p_name = (NoneableName *)checked_name; - PyObject *py_name_as_bytes = NULL; - char *p_name_as_c_string = NULL; - - DPRINTF("inside convert_name_param\n"); - DPRINTF("PyBytes_Check() = %d \n", PyBytes_Check(py_name_param)); - DPRINTF("PyString_Check() = %d \n", PyString_Check(py_name_param)); - DPRINTF("PyUnicode_Check() = %d \n", PyUnicode_Check(py_name_param)); - - p_name->is_none = 0; - - // The name can be None or a Python string - if (py_name_param == Py_None) { - DPRINTF("name is None\n"); - rc = 1; - p_name->is_none = 1; - } - else if (PyUnicode_Check(py_name_param) || PyBytes_Check(py_name_param)) { - DPRINTF("name is Unicode or bytes\n"); - // The caller passed me a Unicode string or a byte array; I need a - // char *. Getting from one to the other takes a couple steps. - - if (PyUnicode_Check(py_name_param)) { - DPRINTF("name is Unicode\n"); - // PyUnicode_FSConverter() converts the Unicode object into a - // bytes or a bytearray object. (Why can't it be one or the other?) - PyUnicode_FSConverter(py_name_param, &py_name_as_bytes); - } - else { - DPRINTF("name is bytes\n"); - // Make a copy of the name param. - py_name_as_bytes = PyBytes_FromObject(py_name_param); - } - - // bytes_to_c_string() returns a pointer to the buffer. - p_name_as_c_string = bytes_to_c_string(py_name_as_bytes, 0); - - // PyMalloc memory and copy the user-supplied name to it. - p_name->name = (char *)PyMem_Malloc(strlen(p_name_as_c_string) + 1); - if (p_name->name) { - rc = 1; - strcpy(p_name->name, p_name_as_c_string); - } - else - PyErr_SetString(PyExc_MemoryError, "Out of memory"); - - // The bytes version of the name isn't useful to me, and per the - // documentation for PyUnicode_FSConverter(), I am responsible for - // releasing it when I'm done. - release_bytes(py_name_as_bytes); - } - else - PyErr_SetString(PyExc_TypeError, "Name must be None or a string"); - - return rc; -} - - - -/* ===== Begin Shared Memory implementation functions ===== */ - -static PyObject * -shm_str(SharedMemory *self) { - return PyUnicode_FromString(self->name ? self->name : "(no name)"); -} - -static PyObject * -shm_repr(SharedMemory *self) { - char mode[32]; - - sprintf(mode, "0%o", (int)(self->mode)); - - return PyUnicode_FromFormat("_posixshmem.SharedMemory(\"%s\", mode=%s)", - self->name, mode); -} - -static PyObject * -my_shm_unlink(const char *name) { - DPRINTF("unlinking shm name %s\n", name); - if (-1 == shm_unlink(name)) { - switch (errno) { - case EACCES: - PyErr_SetString(pPermissionsException, "Permission denied"); - break; - - case ENOENT: - PyErr_SetString(pExistentialException, - "No shared memory exists with the specified name"); - break; - - case ENAMETOOLONG: - PyErr_SetString(PyExc_ValueError, "The name is too long"); - break; - - default: - PyErr_SetFromErrno(PyExc_OSError); - break; - } - - goto error_return; - } - - Py_RETURN_NONE; - - error_return: - return NULL; -} - - -static PyObject * -SharedMemory_new(PyTypeObject *type, PyObject *args, PyObject *kwlist) { - SharedMemory *self; - - self = (SharedMemory *)type->tp_alloc(type, 0); - - return (PyObject *)self; -} - - -static int -SharedMemory_init(SharedMemory *self, PyObject *args, PyObject *keywords) { - NoneableName name; - char temp_name[MAX_SAFE_NAME_LENGTH + 1]; - unsigned int flags = 0; - unsigned long size = 0; - int read_only = 0; - static char *keyword_list[ ] = {"name", "flags", "mode", "size", "read_only", NULL}; - - // First things first -- initialize the self struct. - self->name = NULL; - self->fd = 0; - self->mode = 0600; - - if (!PyArg_ParseTupleAndKeywords(args, keywords, "O&|Iiki", keyword_list, - &convert_name_param, &name, &flags, - &(self->mode), &size, &read_only)) - goto error_return; - - if ( !(flags & O_CREAT) && (flags & O_EXCL) ) { - PyErr_SetString(PyExc_ValueError, - "O_EXCL must be combined with O_CREAT"); - goto error_return; - } - - if (name.is_none && ((flags & O_EXCL) != O_EXCL)) { - PyErr_SetString(PyExc_ValueError, - "Name can only be None if O_EXCL is set"); - goto error_return; - } - - flags |= (read_only ? O_RDONLY : O_RDWR); - - if (name.is_none) { - // (name == None) ==> generate a name for the caller - do { - errno = 0; - create_random_name(temp_name); - - DPRINTF("calling shm_open, name=%s, flags=0x%x, mode=0%o\n", - temp_name, flags, (int)self->mode); - self->fd = shm_open(temp_name, flags, (mode_t)self->mode); - - } while ( (-1 == self->fd) && (EEXIST == errno) ); - - // PyMalloc memory and copy the randomly-generated name to it. - self->name = (char *)PyMem_Malloc(strlen(temp_name) + 1); - if (self->name) - strcpy(self->name, temp_name); - else { - PyErr_SetString(PyExc_MemoryError, "Out of memory"); - goto error_return; - } - } - else { - // (name != None) ==> use name supplied by the caller. It was - // already converted to C by convert_name_param(). - self->name = name.name; - - DPRINTF("calling shm_open, name=%s, flags=0x%x, mode=0%o\n", - self->name, flags, (int)self->mode); - self->fd = shm_open(self->name, flags, (mode_t)self->mode); - } - - DPRINTF("shm fd = %d\n", self->fd); - - if (-1 == self->fd) { - self->fd = 0; - switch (errno) { - case EACCES: - PyErr_Format(pPermissionsException, - "No permission to %s this segment", - (flags & O_TRUNC) ? "truncate" : "access" - ); - break; - - case EEXIST: - PyErr_SetString(pExistentialException, - "Shared memory with the specified name already exists"); - break; - - case ENOENT: - PyErr_SetString(pExistentialException, - "No shared memory exists with the specified name"); - break; - - case EINVAL: - PyErr_SetString(PyExc_ValueError, "Invalid parameter(s)"); - break; - - case EMFILE: - PyErr_SetString(PyExc_OSError, - "This process already has the maximum number of files open"); - break; - - case ENFILE: - PyErr_SetString(PyExc_OSError, - "The system limit on the total number of open files has been reached"); - break; - - case ENAMETOOLONG: - PyErr_SetString(PyExc_ValueError, - "The name is too long"); - break; - - default: - PyErr_SetFromErrno(PyExc_OSError); - break; - } - - goto error_return; - } - else { - if (size) { - DPRINTF("calling ftruncate, fd = %d, size = %ld\n", self->fd, size); - if (-1 == ftruncate(self->fd, (off_t)size)) { - // The code below will raise a Python error. Since that error - // is raised during __init__(), it will look to the caller - // as if object creation failed entirely. Here I clean up - // the system object I just created. - close(self->fd); - shm_unlink(self->name); - - // ftruncate can return a ton of different errors, but most - // are not relevant or are extremely unlikely. - switch (errno) { - case EINVAL: - PyErr_SetString(PyExc_ValueError, - "The size is invalid or the memory is read-only"); - break; - - case EFBIG: - PyErr_SetString(PyExc_ValueError, - "The size is too large"); - break; - - case EROFS: - case EACCES: - PyErr_SetString(pPermissionsException, - "The memory is read-only"); - break; - - default: - PyErr_SetFromErrno(PyExc_OSError); - break; - } - - goto error_return; - } - } - } - - return 0; - - error_return: - return -1; -} - - -static void SharedMemory_dealloc(SharedMemory *self) { - DPRINTF("dealloc\n"); - PyMem_Free(self->name); - self->name = NULL; - - Py_TYPE(self)->tp_free((PyObject*)self); -} - - -PyObject * -SharedMemory_getsize(SharedMemory *self, void *closure) { - struct stat fileinfo; - off_t size = -1; - - if (0 == fstat(self->fd, &fileinfo)) - size = fileinfo.st_size; - else { - switch (errno) { - case EBADF: - case EINVAL: - PyErr_SetString(pExistentialException, - "The segment does not exist"); - break; - - default: - PyErr_SetFromErrno(PyExc_OSError); - break; - } - - goto error_return; - } - - return Py_BuildValue("k", (unsigned long)size); - - error_return: - return NULL; -} - - -PyObject * -SharedMemory_close_fd(SharedMemory *self) { - if (self->fd) { - if (-1 == close(self->fd)) { - switch (errno) { - case EBADF: - PyErr_SetString(PyExc_ValueError, - "The file descriptor is invalid"); - break; - - default: - PyErr_SetFromErrno(PyExc_OSError); - break; - } - - goto error_return; - } - } - - Py_RETURN_NONE; - - error_return: - return NULL; -} - - -PyObject * -SharedMemory_unlink(SharedMemory *self) { - return my_shm_unlink(self->name); -} - - -/* ===== End Shared Memory functions ===== */ - - -/* - * - * Shared memory meta stuff for describing myself to Python - * - */ - - -static PyMemberDef SharedMemory_members[] = { - { "name", - T_STRING, - offsetof(SharedMemory, name), - READONLY, - "The name specified in the constructor" - }, - { "fd", - T_INT, - offsetof(SharedMemory, fd), - READONLY, - "Shared memory segment file descriptor" - }, - { "mode", - T_LONG, - offsetof(SharedMemory, mode), - READONLY, - "The mode specified in the constructor" - }, - {NULL} /* Sentinel */ -}; - - -static PyMethodDef SharedMemory_methods[] = { - { "close_fd", - (PyCFunction)SharedMemory_close_fd, - METH_NOARGS, - "Closes the file descriptor associated with the shared memory." - }, - { "unlink", - (PyCFunction)SharedMemory_unlink, - METH_NOARGS, - "Unlink (remove) the shared memory." - }, - {NULL, NULL, 0, NULL} /* Sentinel */ -}; - - -static PyGetSetDef SharedMemory_getseters[] = { - // size is read-only - { "size", - (getter)SharedMemory_getsize, - (setter)NULL, - "size", - NULL - }, - {NULL} /* Sentinel */ -}; - - -static PyTypeObject SharedMemoryType = { - PyVarObject_HEAD_INIT(NULL, 0) - "_posixshmem._PosixSharedMemory", // tp_name - sizeof(SharedMemory), // tp_basicsize - 0, // tp_itemsize - (destructor) SharedMemory_dealloc, // tp_dealloc - 0, // tp_print - 0, // tp_getattr - 0, // tp_setattr - 0, // tp_compare - (reprfunc) shm_repr, // tp_repr - 0, // tp_as_number - 0, // tp_as_sequence - 0, // tp_as_mapping - 0, // tp_hash - 0, // tp_call - (reprfunc) shm_str, // tp_str - 0, // tp_getattro - 0, // tp_setattro - 0, // tp_as_buffer - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, - // tp_flags - "POSIX shared memory object", // tp_doc - 0, // tp_traverse - 0, // tp_clear - 0, // tp_richcompare - 0, // tp_weaklistoffset - 0, // tp_iter - 0, // tp_iternext - SharedMemory_methods, // tp_methods - SharedMemory_members, // tp_members - SharedMemory_getseters, // tp_getset - 0, // tp_base - 0, // tp_dict - 0, // tp_descr_get - 0, // tp_descr_set - 0, // tp_dictoffset - (initproc) SharedMemory_init, // tp_init - 0, // tp_alloc - (newfunc) SharedMemory_new, // tp_new - 0, // tp_free - 0, // tp_is_gc - 0 // tp_bases -}; - +/*[clinic input] +module _posixshmem +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=a416734e49164bf8]*/ /* * @@ -637,23 +23,90 @@ static PyTypeObject SharedMemoryType = { * */ -static PyObject * -posixshmem_unlink_shared_memory(PyObject *self, PyObject *args) { - const char *name; +#ifdef HAVE_SHM_OPEN +/*[clinic input] +_posixshmem.shm_open -> int + path: unicode + flags: int + mode: int = 0o777 - if (!PyArg_ParseTuple(args, "s", &name)) - return NULL; - else - return my_shm_unlink(name); +# "shm_open(path, flags, mode=0o777)\n\n\ + +Open a shared memory object. Returns a file descriptor (integer). + +[clinic start generated code]*/ + +static int +_posixshmem_shm_open_impl(PyObject *module, PyObject *path, int flags, + int mode) +/*[clinic end generated code: output=8d110171a4fa20df input=e83b58fa802fac25]*/ +{ + int fd; + int async_err = 0; + const char *name = PyUnicode_AsUTF8(path); + if (name == NULL) { + return -1; + } + do { + Py_BEGIN_ALLOW_THREADS + fd = shm_open(name, flags, mode); + Py_END_ALLOW_THREADS + } while (fd < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals())); + + if (fd < 0) { + if (!async_err) + PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path); + return -1; + } + + return fd; } +#endif /* HAVE_SHM_OPEN */ +#ifdef HAVE_SHM_UNLINK +/*[clinic input] +_posixshmem.shm_unlink + path: unicode + +Remove a shared memory object (similar to unlink()). + +Remove a shared memory object name, and, once all processes have unmapped +the object, de-allocates and destroys the contents of the associated memory +region. + +[clinic start generated code]*/ + +static PyObject * +_posixshmem_shm_unlink_impl(PyObject *module, PyObject *path) +/*[clinic end generated code: output=42f8b23d134b9ff5 input=8dc0f87143e3b300]*/ +{ + int rv; + int async_err = 0; + const char *name = PyUnicode_AsUTF8(path); + if (name == NULL) { + return NULL; + } + do { + Py_BEGIN_ALLOW_THREADS + rv = shm_unlink(name); + Py_END_ALLOW_THREADS + } while (rv < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals())); + + if (rv < 0) { + if (!async_err) + PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path); + return NULL; + } + + Py_RETURN_NONE; +} +#endif /* HAVE_SHM_UNLINK */ + +#include "clinic/posixshmem.c.h" static PyMethodDef module_methods[ ] = { - { "unlink_shared_memory", - (PyCFunction)posixshmem_unlink_shared_memory, - METH_VARARGS, - "Unlink shared memory" - }, + _POSIXSHMEM_SHM_OPEN_METHODDEF + _POSIXSHMEM_SHM_UNLINK_METHODDEF {NULL} /* Sentinel */ }; @@ -664,61 +117,15 @@ static struct PyModuleDef this_module = { "POSIX shared memory module", // m_doc -1, // m_size (space allocated for module globals) module_methods, // m_methods - NULL, // m_reload - NULL, // m_traverse - NULL, // m_clear - NULL // m_free }; /* Module init function */ PyMODINIT_FUNC PyInit__posixshmem(void) { PyObject *module; - PyObject *module_dict; - - // I call this in case I'm asked to create any random names. - srand((unsigned int)time(NULL)); - module = PyModule_Create(&this_module); - - if (!module) - goto error_return; - - if (PyType_Ready(&SharedMemoryType) < 0) - goto error_return; - - Py_INCREF(&SharedMemoryType); - PyModule_AddObject(module, "_PosixSharedMemory", (PyObject *)&SharedMemoryType); - - - PyModule_AddStringConstant(module, "__copyright__", "Copyright 2012 Philip Semanchuk, 2018-2019 Davin Potts"); - - PyModule_AddIntConstant(module, "O_CREAT", O_CREAT); - PyModule_AddIntConstant(module, "O_EXCL", O_EXCL); - PyModule_AddIntConstant(module, "O_CREX", O_CREAT | O_EXCL); - PyModule_AddIntConstant(module, "O_TRUNC", O_TRUNC); - - if (!(module_dict = PyModule_GetDict(module))) - goto error_return; - - // Exceptions - if (!(pBaseException = PyErr_NewException("_posixshmem.Error", NULL, NULL))) - goto error_return; - else - PyDict_SetItemString(module_dict, "Error", pBaseException); - - if (!(pPermissionsException = PyErr_NewException("_posixshmem.PermissionsError", pBaseException, NULL))) - goto error_return; - else - PyDict_SetItemString(module_dict, "PermissionsError", pPermissionsException); - - if (!(pExistentialException = PyErr_NewException("_posixshmem.ExistentialError", pBaseException, NULL))) - goto error_return; - else - PyDict_SetItemString(module_dict, "ExistentialError", pExistentialException); - + if (!module) { + return NULL; + } return module; - - error_return: - return NULL; } diff --git a/Modules/_winapi.c b/Modules/_winapi.c index cdb45c23e78..e7b221d888e 100644 --- a/Modules/_winapi.c +++ b/Modules/_winapi.c @@ -159,6 +159,7 @@ def create_converter(type_, format_unit): create_converter('HANDLE', '" F_HANDLE "') create_converter('HMODULE', '" F_HANDLE "') create_converter('LPSECURITY_ATTRIBUTES', '" F_POINTER "') +create_converter('LPCVOID', '" F_POINTER "') create_converter('BOOL', 'i') # F_BOOL used previously (always 'i') create_converter('DWORD', 'k') # F_DWORD is always "k" (which is much shorter) @@ -186,8 +187,17 @@ class DWORD_return_converter(CReturnConverter): self.err_occurred_if("_return_value == PY_DWORD_MAX", data) data.return_conversion.append( 'return_value = Py_BuildValue("k", _return_value);\n') + +class LPVOID_return_converter(CReturnConverter): + type = 'LPVOID' + + def render(self, function, data): + self.declare(data) + self.err_occurred_if("_return_value == NULL", data) + data.return_conversion.append( + 'return_value = HANDLE_TO_PYNUM(_return_value);\n') [python start generated code]*/ -/*[python end generated code: output=da39a3ee5e6b4b0d input=27456f8555228b62]*/ +/*[python end generated code: output=da39a3ee5e6b4b0d input=79464c61a31ae932]*/ #include "clinic/_winapi.c.h" @@ -464,6 +474,41 @@ _winapi_CreateFile_impl(PyObject *module, LPCTSTR file_name, return handle; } +/*[clinic input] +_winapi.CreateFileMapping -> HANDLE + + file_handle: HANDLE + security_attributes: LPSECURITY_ATTRIBUTES + protect: DWORD + max_size_high: DWORD + max_size_low: DWORD + name: LPCWSTR + / +[clinic start generated code]*/ + +static HANDLE +_winapi_CreateFileMapping_impl(PyObject *module, HANDLE file_handle, + LPSECURITY_ATTRIBUTES security_attributes, + DWORD protect, DWORD max_size_high, + DWORD max_size_low, LPCWSTR name) +/*[clinic end generated code: output=6c0a4d5cf7f6fcc6 input=3dc5cf762a74dee8]*/ +{ + HANDLE handle; + + Py_BEGIN_ALLOW_THREADS + handle = CreateFileMappingW(file_handle, security_attributes, + protect, max_size_high, max_size_low, + name); + Py_END_ALLOW_THREADS + + if (handle == NULL) { + PyErr_SetFromWindowsErrWithUnicodeFilename(0, name); + handle = INVALID_HANDLE_VALUE; + } + + return handle; +} + /*[clinic input] _winapi.CreateJunction @@ -1295,6 +1340,64 @@ _winapi_GetVersion_impl(PyObject *module) #pragma warning(pop) +/*[clinic input] +_winapi.MapViewOfFile -> LPVOID + + file_map: HANDLE + desired_access: DWORD + file_offset_high: DWORD + file_offset_low: DWORD + number_bytes: size_t + / +[clinic start generated code]*/ + +static LPVOID +_winapi_MapViewOfFile_impl(PyObject *module, HANDLE file_map, + DWORD desired_access, DWORD file_offset_high, + DWORD file_offset_low, size_t number_bytes) +/*[clinic end generated code: output=f23b1ee4823663e3 input=177471073be1a103]*/ +{ + LPVOID address; + + Py_BEGIN_ALLOW_THREADS + address = MapViewOfFile(file_map, desired_access, file_offset_high, + file_offset_low, number_bytes); + Py_END_ALLOW_THREADS + + if (address == NULL) + PyErr_SetFromWindowsErr(0); + + return address; +} + +/*[clinic input] +_winapi.OpenFileMapping -> HANDLE + + desired_access: DWORD + inherit_handle: BOOL + name: LPCWSTR + / +[clinic start generated code]*/ + +static HANDLE +_winapi_OpenFileMapping_impl(PyObject *module, DWORD desired_access, + BOOL inherit_handle, LPCWSTR name) +/*[clinic end generated code: output=08cc44def1cb11f1 input=131f2a405359de7f]*/ +{ + HANDLE handle; + + Py_BEGIN_ALLOW_THREADS + handle = OpenFileMappingW(desired_access, inherit_handle, name); + Py_END_ALLOW_THREADS + + if (handle == NULL) { + PyErr_SetFromWindowsErrWithUnicodeFilename(0, name); + handle = INVALID_HANDLE_VALUE; + } + + return handle; +} + /*[clinic input] _winapi.OpenProcess -> HANDLE @@ -1490,6 +1593,32 @@ _winapi_TerminateProcess_impl(PyObject *module, HANDLE handle, Py_RETURN_NONE; } +/*[clinic input] +_winapi.VirtualQuerySize -> size_t + + address: LPCVOID + / +[clinic start generated code]*/ + +static size_t +_winapi_VirtualQuerySize_impl(PyObject *module, LPCVOID address) +/*[clinic end generated code: output=40c8e0ff5ec964df input=6b784a69755d0bb6]*/ +{ + SIZE_T size_of_buf; + MEMORY_BASIC_INFORMATION mem_basic_info; + SIZE_T region_size; + + Py_BEGIN_ALLOW_THREADS + size_of_buf = VirtualQuery(address, &mem_basic_info, sizeof(mem_basic_info)); + Py_END_ALLOW_THREADS + + if (size_of_buf == 0) + PyErr_SetFromWindowsErr(0); + + region_size = mem_basic_info.RegionSize; + return region_size; +} + /*[clinic input] _winapi.WaitNamedPipe @@ -1719,6 +1848,7 @@ static PyMethodDef winapi_functions[] = { _WINAPI_CLOSEHANDLE_METHODDEF _WINAPI_CONNECTNAMEDPIPE_METHODDEF _WINAPI_CREATEFILE_METHODDEF + _WINAPI_CREATEFILEMAPPING_METHODDEF _WINAPI_CREATENAMEDPIPE_METHODDEF _WINAPI_CREATEPIPE_METHODDEF _WINAPI_CREATEPROCESS_METHODDEF @@ -1731,11 +1861,14 @@ static PyMethodDef winapi_functions[] = { _WINAPI_GETMODULEFILENAME_METHODDEF _WINAPI_GETSTDHANDLE_METHODDEF _WINAPI_GETVERSION_METHODDEF + _WINAPI_MAPVIEWOFFILE_METHODDEF + _WINAPI_OPENFILEMAPPING_METHODDEF _WINAPI_OPENPROCESS_METHODDEF _WINAPI_PEEKNAMEDPIPE_METHODDEF _WINAPI_READFILE_METHODDEF _WINAPI_SETNAMEDPIPEHANDLESTATE_METHODDEF _WINAPI_TERMINATEPROCESS_METHODDEF + _WINAPI_VIRTUALQUERYSIZE_METHODDEF _WINAPI_WAITNAMEDPIPE_METHODDEF _WINAPI_WAITFORMULTIPLEOBJECTS_METHODDEF _WINAPI_WAITFORSINGLEOBJECT_METHODDEF @@ -1799,11 +1932,34 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED); WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_READ); WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_WRITE); + WINAPI_CONSTANT(F_DWORD, FILE_MAP_ALL_ACCESS); + WINAPI_CONSTANT(F_DWORD, FILE_MAP_COPY); + WINAPI_CONSTANT(F_DWORD, FILE_MAP_EXECUTE); + WINAPI_CONSTANT(F_DWORD, FILE_MAP_READ); + WINAPI_CONSTANT(F_DWORD, FILE_MAP_WRITE); WINAPI_CONSTANT(F_DWORD, GENERIC_READ); WINAPI_CONSTANT(F_DWORD, GENERIC_WRITE); WINAPI_CONSTANT(F_DWORD, INFINITE); + WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE); + WINAPI_CONSTANT(F_DWORD, MEM_COMMIT); + WINAPI_CONSTANT(F_DWORD, MEM_FREE); + WINAPI_CONSTANT(F_DWORD, MEM_IMAGE); + WINAPI_CONSTANT(F_DWORD, MEM_MAPPED); + WINAPI_CONSTANT(F_DWORD, MEM_PRIVATE); + WINAPI_CONSTANT(F_DWORD, MEM_RESERVE); WINAPI_CONSTANT(F_DWORD, NMPWAIT_WAIT_FOREVER); WINAPI_CONSTANT(F_DWORD, OPEN_EXISTING); + WINAPI_CONSTANT(F_DWORD, PAGE_EXECUTE); + WINAPI_CONSTANT(F_DWORD, PAGE_EXECUTE_READ); + WINAPI_CONSTANT(F_DWORD, PAGE_EXECUTE_READWRITE); + WINAPI_CONSTANT(F_DWORD, PAGE_EXECUTE_WRITECOPY); + WINAPI_CONSTANT(F_DWORD, PAGE_GUARD); + WINAPI_CONSTANT(F_DWORD, PAGE_NOACCESS); + WINAPI_CONSTANT(F_DWORD, PAGE_NOCACHE); + WINAPI_CONSTANT(F_DWORD, PAGE_READONLY); + WINAPI_CONSTANT(F_DWORD, PAGE_READWRITE); + WINAPI_CONSTANT(F_DWORD, PAGE_WRITECOMBINE); + WINAPI_CONSTANT(F_DWORD, PAGE_WRITECOPY); WINAPI_CONSTANT(F_DWORD, PIPE_ACCESS_DUPLEX); WINAPI_CONSTANT(F_DWORD, PIPE_ACCESS_INBOUND); WINAPI_CONSTANT(F_DWORD, PIPE_READMODE_MESSAGE); @@ -1812,6 +1968,12 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, PIPE_WAIT); WINAPI_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS); WINAPI_CONSTANT(F_DWORD, PROCESS_DUP_HANDLE); + WINAPI_CONSTANT(F_DWORD, SEC_COMMIT); + WINAPI_CONSTANT(F_DWORD, SEC_IMAGE); + WINAPI_CONSTANT(F_DWORD, SEC_LARGE_PAGES); + WINAPI_CONSTANT(F_DWORD, SEC_NOCACHE); + WINAPI_CONSTANT(F_DWORD, SEC_RESERVE); + WINAPI_CONSTANT(F_DWORD, SEC_WRITECOMBINE); WINAPI_CONSTANT(F_DWORD, STARTF_USESHOWWINDOW); WINAPI_CONSTANT(F_DWORD, STARTF_USESTDHANDLES); WINAPI_CONSTANT(F_DWORD, STD_INPUT_HANDLE); diff --git a/Modules/clinic/_winapi.c.h b/Modules/clinic/_winapi.c.h index f1158a00621..e21f2bc2b6f 100644 --- a/Modules/clinic/_winapi.c.h +++ b/Modules/clinic/_winapi.c.h @@ -168,6 +168,50 @@ exit: return return_value; } +PyDoc_STRVAR(_winapi_CreateFileMapping__doc__, +"CreateFileMapping($module, file_handle, security_attributes, protect,\n" +" max_size_high, max_size_low, name, /)\n" +"--\n" +"\n"); + +#define _WINAPI_CREATEFILEMAPPING_METHODDEF \ + {"CreateFileMapping", (PyCFunction)(void(*)(void))_winapi_CreateFileMapping, METH_FASTCALL, _winapi_CreateFileMapping__doc__}, + +static HANDLE +_winapi_CreateFileMapping_impl(PyObject *module, HANDLE file_handle, + LPSECURITY_ATTRIBUTES security_attributes, + DWORD protect, DWORD max_size_high, + DWORD max_size_low, LPCWSTR name); + +static PyObject * +_winapi_CreateFileMapping(PyObject *module, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + HANDLE file_handle; + LPSECURITY_ATTRIBUTES security_attributes; + DWORD protect; + DWORD max_size_high; + DWORD max_size_low; + LPCWSTR name; + HANDLE _return_value; + + if (!_PyArg_ParseStack(args, nargs, "" F_HANDLE "" F_POINTER "kkku:CreateFileMapping", + &file_handle, &security_attributes, &protect, &max_size_high, &max_size_low, &name)) { + goto exit; + } + _return_value = _winapi_CreateFileMapping_impl(module, file_handle, security_attributes, protect, max_size_high, max_size_low, name); + if ((_return_value == INVALID_HANDLE_VALUE) && PyErr_Occurred()) { + goto exit; + } + if (_return_value == NULL) { + Py_RETURN_NONE; + } + return_value = HANDLE_TO_PYNUM(_return_value); + +exit: + return return_value; +} + PyDoc_STRVAR(_winapi_CreateJunction__doc__, "CreateJunction($module, src_path, dst_path, /)\n" "--\n" @@ -602,6 +646,83 @@ exit: return return_value; } +PyDoc_STRVAR(_winapi_MapViewOfFile__doc__, +"MapViewOfFile($module, file_map, desired_access, file_offset_high,\n" +" file_offset_low, number_bytes, /)\n" +"--\n" +"\n"); + +#define _WINAPI_MAPVIEWOFFILE_METHODDEF \ + {"MapViewOfFile", (PyCFunction)(void(*)(void))_winapi_MapViewOfFile, METH_FASTCALL, _winapi_MapViewOfFile__doc__}, + +static LPVOID +_winapi_MapViewOfFile_impl(PyObject *module, HANDLE file_map, + DWORD desired_access, DWORD file_offset_high, + DWORD file_offset_low, size_t number_bytes); + +static PyObject * +_winapi_MapViewOfFile(PyObject *module, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + HANDLE file_map; + DWORD desired_access; + DWORD file_offset_high; + DWORD file_offset_low; + size_t number_bytes; + LPVOID _return_value; + + if (!_PyArg_ParseStack(args, nargs, "" F_HANDLE "kkkO&:MapViewOfFile", + &file_map, &desired_access, &file_offset_high, &file_offset_low, _PyLong_Size_t_Converter, &number_bytes)) { + goto exit; + } + _return_value = _winapi_MapViewOfFile_impl(module, file_map, desired_access, file_offset_high, file_offset_low, number_bytes); + if ((_return_value == NULL) && PyErr_Occurred()) { + goto exit; + } + return_value = HANDLE_TO_PYNUM(_return_value); + +exit: + return return_value; +} + +PyDoc_STRVAR(_winapi_OpenFileMapping__doc__, +"OpenFileMapping($module, desired_access, inherit_handle, name, /)\n" +"--\n" +"\n"); + +#define _WINAPI_OPENFILEMAPPING_METHODDEF \ + {"OpenFileMapping", (PyCFunction)(void(*)(void))_winapi_OpenFileMapping, METH_FASTCALL, _winapi_OpenFileMapping__doc__}, + +static HANDLE +_winapi_OpenFileMapping_impl(PyObject *module, DWORD desired_access, + BOOL inherit_handle, LPCWSTR name); + +static PyObject * +_winapi_OpenFileMapping(PyObject *module, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + DWORD desired_access; + BOOL inherit_handle; + LPCWSTR name; + HANDLE _return_value; + + if (!_PyArg_ParseStack(args, nargs, "kiu:OpenFileMapping", + &desired_access, &inherit_handle, &name)) { + goto exit; + } + _return_value = _winapi_OpenFileMapping_impl(module, desired_access, inherit_handle, name); + if ((_return_value == INVALID_HANDLE_VALUE) && PyErr_Occurred()) { + goto exit; + } + if (_return_value == NULL) { + Py_RETURN_NONE; + } + return_value = HANDLE_TO_PYNUM(_return_value); + +exit: + return return_value; +} + PyDoc_STRVAR(_winapi_OpenProcess__doc__, "OpenProcess($module, desired_access, inherit_handle, process_id, /)\n" "--\n" @@ -764,6 +885,37 @@ exit: return return_value; } +PyDoc_STRVAR(_winapi_VirtualQuerySize__doc__, +"VirtualQuerySize($module, address, /)\n" +"--\n" +"\n"); + +#define _WINAPI_VIRTUALQUERYSIZE_METHODDEF \ + {"VirtualQuerySize", (PyCFunction)_winapi_VirtualQuerySize, METH_O, _winapi_VirtualQuerySize__doc__}, + +static size_t +_winapi_VirtualQuerySize_impl(PyObject *module, LPCVOID address); + +static PyObject * +_winapi_VirtualQuerySize(PyObject *module, PyObject *arg) +{ + PyObject *return_value = NULL; + LPCVOID address; + size_t _return_value; + + if (!PyArg_Parse(arg, "" F_POINTER ":VirtualQuerySize", &address)) { + goto exit; + } + _return_value = _winapi_VirtualQuerySize_impl(module, address); + if ((_return_value == (size_t)-1) && PyErr_Occurred()) { + goto exit; + } + return_value = PyLong_FromSize_t(_return_value); + +exit: + return return_value; +} + PyDoc_STRVAR(_winapi_WaitNamedPipe__doc__, "WaitNamedPipe($module, name, timeout, /)\n" "--\n" @@ -945,4 +1097,4 @@ _winapi_GetFileType(PyObject *module, PyObject *const *args, Py_ssize_t nargs, P exit: return return_value; } -/*[clinic end generated code: output=5063c84b2d125488 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=f3897898ea1da99d input=a9049054013a1b77]*/