mirror of https://github.com/python/cpython
Apply Amaury's patch to multiprocessing for issue 3125, removes the copy_reg and replaces it with ForkingPickler.register(), which should resolve the conflict with the global registry/ctypes
This commit is contained in:
parent
a6c5dc07f4
commit
13e9d582fd
|
@ -12,7 +12,7 @@ import signal
|
||||||
|
|
||||||
from multiprocessing import util, process
|
from multiprocessing import util, process
|
||||||
|
|
||||||
__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close']
|
__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
|
||||||
|
|
||||||
#
|
#
|
||||||
# Check that the current thread is spawning a child process
|
# Check that the current thread is spawning a child process
|
||||||
|
@ -25,6 +25,49 @@ def assert_spawning(self):
|
||||||
' through inheritance' % type(self).__name__
|
' through inheritance' % type(self).__name__
|
||||||
)
|
)
|
||||||
|
|
||||||
|
#
|
||||||
|
# Try making some callable types picklable
|
||||||
|
#
|
||||||
|
|
||||||
|
from pickle import Pickler
|
||||||
|
class ForkingPickler(Pickler):
|
||||||
|
dispatch = Pickler.dispatch.copy()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def register(cls, type, reduce):
|
||||||
|
def dispatcher(self, obj):
|
||||||
|
rv = reduce(obj)
|
||||||
|
self.save_reduce(obj=obj, *rv)
|
||||||
|
cls.dispatch[type] = dispatcher
|
||||||
|
|
||||||
|
def _reduce_method(m):
|
||||||
|
if m.im_self is None:
|
||||||
|
return getattr, (m.im_class, m.im_func.func_name)
|
||||||
|
else:
|
||||||
|
return getattr, (m.im_self, m.im_func.func_name)
|
||||||
|
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
|
||||||
|
|
||||||
|
def _reduce_method_descriptor(m):
|
||||||
|
return getattr, (m.__objclass__, m.__name__)
|
||||||
|
ForkingPickler.register(type(list.append), _reduce_method_descriptor)
|
||||||
|
ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
|
||||||
|
|
||||||
|
#def _reduce_builtin_function_or_method(m):
|
||||||
|
# return getattr, (m.__self__, m.__name__)
|
||||||
|
#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
|
||||||
|
#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from functools import partial
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
def _reduce_partial(p):
|
||||||
|
return _rebuild_partial, (p.func, p.args, p.keywords or {})
|
||||||
|
def _rebuild_partial(func, args, keywords):
|
||||||
|
return partial(func, *args, **keywords)
|
||||||
|
ForkingPickler.register(partial, _reduce_partial)
|
||||||
|
|
||||||
#
|
#
|
||||||
# Unix
|
# Unix
|
||||||
#
|
#
|
||||||
|
@ -105,16 +148,18 @@ else:
|
||||||
import thread
|
import thread
|
||||||
import msvcrt
|
import msvcrt
|
||||||
import _subprocess
|
import _subprocess
|
||||||
import copy_reg
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from ._multiprocessing import win32, Connection, PipeConnection
|
from ._multiprocessing import win32, Connection, PipeConnection
|
||||||
from .util import Finalize
|
from .util import Finalize
|
||||||
|
|
||||||
try:
|
#try:
|
||||||
from cPickle import dump, load, HIGHEST_PROTOCOL
|
# from cPickle import dump, load, HIGHEST_PROTOCOL
|
||||||
except ImportError:
|
#except ImportError:
|
||||||
from pickle import dump, load, HIGHEST_PROTOCOL
|
from pickle import load, HIGHEST_PROTOCOL
|
||||||
|
|
||||||
|
def dump(obj, file, protocol=None):
|
||||||
|
ForkingPickler(file, protocol).dump(obj)
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
@ -346,9 +391,8 @@ else:
|
||||||
return type(conn), (Popen.duplicate_for_child(conn.fileno()),
|
return type(conn), (Popen.duplicate_for_child(conn.fileno()),
|
||||||
conn.readable, conn.writable)
|
conn.readable, conn.writable)
|
||||||
|
|
||||||
copy_reg.pickle(Connection, reduce_connection)
|
ForkingPickler.register(Connection, reduce_connection)
|
||||||
copy_reg.pickle(PipeConnection, reduce_connection)
|
ForkingPickler.register(PipeConnection, reduce_connection)
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Prepare current process
|
# Prepare current process
|
||||||
|
|
|
@ -18,13 +18,12 @@ import sys
|
||||||
import weakref
|
import weakref
|
||||||
import threading
|
import threading
|
||||||
import array
|
import array
|
||||||
import copy_reg
|
|
||||||
import Queue
|
import Queue
|
||||||
|
|
||||||
from traceback import format_exc
|
from traceback import format_exc
|
||||||
from multiprocessing import Process, current_process, active_children, Pool, util, connection
|
from multiprocessing import Process, current_process, active_children, Pool, util, connection
|
||||||
from multiprocessing.process import AuthenticationString
|
from multiprocessing.process import AuthenticationString
|
||||||
from multiprocessing.forking import exit, Popen, assert_spawning
|
from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
|
||||||
from multiprocessing.util import Finalize, info
|
from multiprocessing.util import Finalize, info
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -38,7 +37,7 @@ except ImportError:
|
||||||
|
|
||||||
def reduce_array(a):
|
def reduce_array(a):
|
||||||
return array.array, (a.typecode, a.tostring())
|
return array.array, (a.typecode, a.tostring())
|
||||||
copy_reg.pickle(array.array, reduce_array)
|
ForkingPickler.register(array.array, reduce_array)
|
||||||
|
|
||||||
view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
|
view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
|
||||||
|
|
||||||
|
|
|
@ -13,11 +13,10 @@ import os
|
||||||
import sys
|
import sys
|
||||||
import socket
|
import socket
|
||||||
import threading
|
import threading
|
||||||
import copy_reg
|
|
||||||
|
|
||||||
import _multiprocessing
|
import _multiprocessing
|
||||||
from multiprocessing import current_process
|
from multiprocessing import current_process
|
||||||
from multiprocessing.forking import Popen, duplicate, close
|
from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
|
||||||
from multiprocessing.util import register_after_fork, debug, sub_debug
|
from multiprocessing.util import register_after_fork, debug, sub_debug
|
||||||
from multiprocessing.connection import Client, Listener
|
from multiprocessing.connection import Client, Listener
|
||||||
|
|
||||||
|
@ -134,7 +133,7 @@ def rebuild_handle(pickled_data):
|
||||||
return new_handle
|
return new_handle
|
||||||
|
|
||||||
#
|
#
|
||||||
# Register `_multiprocessing.Connection` with `copy_reg`
|
# Register `_multiprocessing.Connection` with `ForkingPickler`
|
||||||
#
|
#
|
||||||
|
|
||||||
def reduce_connection(conn):
|
def reduce_connection(conn):
|
||||||
|
@ -147,10 +146,10 @@ def rebuild_connection(reduced_handle, readable, writable):
|
||||||
handle, readable=readable, writable=writable
|
handle, readable=readable, writable=writable
|
||||||
)
|
)
|
||||||
|
|
||||||
copy_reg.pickle(_multiprocessing.Connection, reduce_connection)
|
ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
|
||||||
|
|
||||||
#
|
#
|
||||||
# Register `socket.socket` with `copy_reg`
|
# Register `socket.socket` with `ForkingPickler`
|
||||||
#
|
#
|
||||||
|
|
||||||
def fromfd(fd, family, type_, proto=0):
|
def fromfd(fd, family, type_, proto=0):
|
||||||
|
@ -169,10 +168,10 @@ def rebuild_socket(reduced_handle, family, type_, proto):
|
||||||
close(fd)
|
close(fd)
|
||||||
return _sock
|
return _sock
|
||||||
|
|
||||||
copy_reg.pickle(socket.socket, reduce_socket)
|
ForkingPickler.register(socket.socket, reduce_socket)
|
||||||
|
|
||||||
#
|
#
|
||||||
# Register `_multiprocessing.PipeConnection` with `copy_reg`
|
# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
|
||||||
#
|
#
|
||||||
|
|
||||||
if sys.platform == 'win32':
|
if sys.platform == 'win32':
|
||||||
|
@ -187,4 +186,4 @@ if sys.platform == 'win32':
|
||||||
handle, readable=readable, writable=writable
|
handle, readable=readable, writable=writable
|
||||||
)
|
)
|
||||||
|
|
||||||
copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection)
|
ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
|
||||||
|
|
|
@ -9,10 +9,9 @@
|
||||||
import sys
|
import sys
|
||||||
import ctypes
|
import ctypes
|
||||||
import weakref
|
import weakref
|
||||||
import copy_reg
|
|
||||||
|
|
||||||
from multiprocessing import heap, RLock
|
from multiprocessing import heap, RLock
|
||||||
from multiprocessing.forking import assert_spawning
|
from multiprocessing.forking import assert_spawning, ForkingPickler
|
||||||
|
|
||||||
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
|
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
|
||||||
|
|
||||||
|
@ -127,8 +126,7 @@ def reduce_ctype(obj):
|
||||||
def rebuild_ctype(type_, wrapper, length):
|
def rebuild_ctype(type_, wrapper, length):
|
||||||
if length is not None:
|
if length is not None:
|
||||||
type_ = type_ * length
|
type_ = type_ * length
|
||||||
if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table:
|
ForkingPickler.register(type_, reduce_ctype)
|
||||||
copy_reg.pickle(type_, reduce_ctype)
|
|
||||||
obj = type_.from_address(wrapper.get_address())
|
obj = type_.from_address(wrapper.get_address())
|
||||||
obj._wrapper = wrapper
|
obj._wrapper = wrapper
|
||||||
return obj
|
return obj
|
||||||
|
|
|
@ -8,7 +8,6 @@
|
||||||
|
|
||||||
import itertools
|
import itertools
|
||||||
import weakref
|
import weakref
|
||||||
import copy_reg
|
|
||||||
import atexit
|
import atexit
|
||||||
import threading # we want threading to install it's
|
import threading # we want threading to install it's
|
||||||
# cleanup function before multiprocessing does
|
# cleanup function before multiprocessing does
|
||||||
|
@ -302,35 +301,3 @@ class ForkAwareLocal(threading.local):
|
||||||
register_after_fork(self, lambda obj : obj.__dict__.clear())
|
register_after_fork(self, lambda obj : obj.__dict__.clear())
|
||||||
def __reduce__(self):
|
def __reduce__(self):
|
||||||
return type(self), ()
|
return type(self), ()
|
||||||
|
|
||||||
#
|
|
||||||
# Try making some callable types picklable
|
|
||||||
#
|
|
||||||
|
|
||||||
def _reduce_method(m):
|
|
||||||
if m.im_self is None:
|
|
||||||
return getattr, (m.im_class, m.im_func.func_name)
|
|
||||||
else:
|
|
||||||
return getattr, (m.im_self, m.im_func.func_name)
|
|
||||||
copy_reg.pickle(type(Finalize.__init__), _reduce_method)
|
|
||||||
|
|
||||||
def _reduce_method_descriptor(m):
|
|
||||||
return getattr, (m.__objclass__, m.__name__)
|
|
||||||
copy_reg.pickle(type(list.append), _reduce_method_descriptor)
|
|
||||||
copy_reg.pickle(type(int.__add__), _reduce_method_descriptor)
|
|
||||||
|
|
||||||
def _reduce_builtin_function_or_method(m):
|
|
||||||
return getattr, (m.__self__, m.__name__)
|
|
||||||
copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method)
|
|
||||||
copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method)
|
|
||||||
|
|
||||||
try:
|
|
||||||
from functools import partial
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
def _reduce_partial(p):
|
|
||||||
return _rebuild_partial, (p.func, p.args, p.keywords or {})
|
|
||||||
def _rebuild_partial(func, args, keywords):
|
|
||||||
return partial(func, *args, **keywords)
|
|
||||||
copy_reg.pickle(partial, _reduce_partial)
|
|
||||||
|
|
|
@ -63,6 +63,11 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #3125: Remove copy_reg in multiprocessing and replace it with
|
||||||
|
ForkingPickler.register() to resolve conflict with ctypes.
|
||||||
|
|
||||||
|
- Issue #3090: Fixed ARCHFLAGS parsing on OS/X
|
||||||
|
|
||||||
- Issue #3313: Fixed a crash when a failed dlopen() call does not set
|
- Issue #3313: Fixed a crash when a failed dlopen() call does not set
|
||||||
a valid dlerror() message.
|
a valid dlerror() message.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue