Issue #15989: Fix several occurrences of integer overflow
when result of PyLong_AsLong() narrowed to int without checks. This is a backport of changesets 13e2e44db99d and 525407d89277.
This commit is contained in:
parent
ff12fae80e
commit
441d30fac7
|
@ -26,6 +26,9 @@ PyAPI_FUNC(Py_ssize_t) PyLong_AsSsize_t(PyObject *);
|
|||
PyAPI_FUNC(size_t) PyLong_AsSize_t(PyObject *);
|
||||
PyAPI_FUNC(unsigned long) PyLong_AsUnsignedLong(PyObject *);
|
||||
PyAPI_FUNC(unsigned long) PyLong_AsUnsignedLongMask(PyObject *);
|
||||
#ifndef Py_LIMITED_API
|
||||
PyAPI_FUNC(int) _PyLong_AsInt(PyObject *);
|
||||
#endif
|
||||
PyAPI_FUNC(PyObject *) PyLong_GetInfo(void);
|
||||
|
||||
/* It may be useful in the future. I've added it in the PyInt -> PyLong
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import unittest
|
||||
from ctypes import *
|
||||
from struct import calcsize
|
||||
import _testcapi
|
||||
|
||||
class SubclassesTest(unittest.TestCase):
|
||||
def test_subclass(self):
|
||||
|
@ -199,6 +200,14 @@ class StructureTestCase(unittest.TestCase):
|
|||
"_pack_": -1}
|
||||
self.assertRaises(ValueError, type(Structure), "X", (Structure,), d)
|
||||
|
||||
# Issue 15989
|
||||
d = {"_fields_": [("a", c_byte)],
|
||||
"_pack_": _testcapi.INT_MAX + 1}
|
||||
self.assertRaises(ValueError, type(Structure), "X", (Structure,), d)
|
||||
d = {"_fields_": [("a", c_byte)],
|
||||
"_pack_": _testcapi.UINT_MAX + 2}
|
||||
self.assertRaises(ValueError, type(Structure), "X", (Structure,), d)
|
||||
|
||||
def test_initializers(self):
|
||||
class Person(Structure):
|
||||
_fields_ = [("name", c_char*6),
|
||||
|
|
|
@ -5,6 +5,7 @@ Common tests shared by test_str, test_unicode, test_userstring and test_string.
|
|||
import unittest, string, sys, struct
|
||||
from test import support
|
||||
from collections import UserList
|
||||
import _testcapi
|
||||
|
||||
class Sequence:
|
||||
def __init__(self, seq='wxyz'): self.seq = seq
|
||||
|
@ -1142,6 +1143,16 @@ class MixinStrUnicodeUserStringTest:
|
|||
self.checkraises(TypeError, '%10.*f', '__mod__', ('foo', 42.))
|
||||
self.checkraises(ValueError, '%10', '__mod__', (42,))
|
||||
|
||||
self.checkraises(OverflowError, '%*s', '__mod__',
|
||||
(_testcapi.PY_SSIZE_T_MAX + 1, ''))
|
||||
self.checkraises(OverflowError, '%.*f', '__mod__',
|
||||
(_testcapi.INT_MAX + 1, 1. / 7))
|
||||
# Issue 15989
|
||||
self.checkraises(OverflowError, '%*s', '__mod__',
|
||||
(1 << (_testcapi.PY_SSIZE_T_MAX.bit_length() + 1), ''))
|
||||
self.checkraises(OverflowError, '%.*f', '__mod__',
|
||||
(_testcapi.UINT_MAX + 1, 1. / 7))
|
||||
|
||||
class X(object): pass
|
||||
self.checkraises(TypeError, 'abc', '__mod__', X())
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ OS/2+EMX doesn't support the file locking operations.
|
|||
import os
|
||||
import struct
|
||||
import sys
|
||||
import _testcapi
|
||||
import unittest
|
||||
from test.support import verbose, TESTFN, unlink, run_unittest, import_module
|
||||
|
||||
|
@ -76,6 +77,26 @@ class TestFcntl(unittest.TestCase):
|
|||
rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
|
||||
self.f.close()
|
||||
|
||||
def test_fcntl_bad_file(self):
|
||||
class F:
|
||||
def __init__(self, fn):
|
||||
self.fn = fn
|
||||
def fileno(self):
|
||||
return self.fn
|
||||
self.assertRaises(ValueError, fcntl.fcntl, -1, fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.assertRaises(ValueError, fcntl.fcntl, F(-1), fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.assertRaises(TypeError, fcntl.fcntl, 'spam', fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.assertRaises(TypeError, fcntl.fcntl, F('spam'), fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
# Issue 15989
|
||||
self.assertRaises(OverflowError, fcntl.fcntl, _testcapi.INT_MAX + 1,
|
||||
fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.assertRaises(OverflowError, fcntl.fcntl, F(_testcapi.INT_MAX + 1),
|
||||
fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.assertRaises(OverflowError, fcntl.fcntl, _testcapi.INT_MIN - 1,
|
||||
fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
self.assertRaises(OverflowError, fcntl.fcntl, F(_testcapi.INT_MIN - 1),
|
||||
fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
|
||||
def test_fcntl_64_bit(self):
|
||||
# Issue #1309352: fcntl shouldn't fail when the third arg fits in a
|
||||
# C 'long' but not in a C 'int'.
|
||||
|
|
|
@ -7,6 +7,7 @@ import unittest
|
|||
from array import array
|
||||
from weakref import proxy
|
||||
from functools import wraps
|
||||
import _testcapi
|
||||
|
||||
from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd
|
||||
from collections import UserList
|
||||
|
@ -346,6 +347,9 @@ class OtherFileTests(unittest.TestCase):
|
|||
if sys.platform == 'win32':
|
||||
import msvcrt
|
||||
self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
|
||||
# Issue 15989
|
||||
self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
|
||||
self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
|
||||
|
||||
def testBadModeArgument(self):
|
||||
# verify that we get a sensible error message for bad mode argument
|
||||
|
|
|
@ -31,6 +31,7 @@ import signal
|
|||
import errno
|
||||
import warnings
|
||||
import pickle
|
||||
import _testcapi
|
||||
from itertools import cycle, count
|
||||
from collections import deque, UserList
|
||||
from test import support
|
||||
|
@ -1903,6 +1904,14 @@ class TextIOWrapperTest(unittest.TestCase):
|
|||
t.write("A\rB")
|
||||
self.assertEqual(r.getvalue(), b"XY\nZA\rB")
|
||||
|
||||
# Issue 15989
|
||||
def test_device_encoding(self):
|
||||
b = self.BytesIO()
|
||||
b.fileno = lambda: _testcapi.INT_MAX + 1
|
||||
self.assertRaises(OverflowError, self.TextIOWrapper, b)
|
||||
b.fileno = lambda: _testcapi.UINT_MAX + 1
|
||||
self.assertRaises(OverflowError, self.TextIOWrapper, b)
|
||||
|
||||
def test_encoding(self):
|
||||
# Check the encoding attribute is always set, and valid
|
||||
b = self.BytesIO()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Test case for the os.poll() function
|
||||
|
||||
import os, select, random, unittest
|
||||
import _testcapi
|
||||
from test.support import TESTFN, run_unittest
|
||||
|
||||
try:
|
||||
|
@ -150,6 +151,15 @@ class PollTests(unittest.TestCase):
|
|||
if x != 5:
|
||||
self.fail('Overflow must have occurred')
|
||||
|
||||
pollster = select.poll()
|
||||
# Issue 15989
|
||||
self.assertRaises(OverflowError, pollster.register, 0,
|
||||
_testcapi.SHRT_MAX + 1)
|
||||
self.assertRaises(OverflowError, pollster.register, 0,
|
||||
_testcapi.USHRT_MAX + 1)
|
||||
self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1)
|
||||
self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1)
|
||||
|
||||
def test_main():
|
||||
run_unittest(PollTests)
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import errno
|
|||
import io
|
||||
import socket
|
||||
import select
|
||||
import _testcapi
|
||||
import time
|
||||
import traceback
|
||||
import queue
|
||||
|
@ -850,11 +851,17 @@ class GeneralModuleTests(unittest.TestCase):
|
|||
self.assertRaises(ValueError, fp.writable)
|
||||
self.assertRaises(ValueError, fp.seekable)
|
||||
|
||||
def testListenBacklog0(self):
|
||||
def test_listen_backlog(self):
|
||||
for backlog in 0, -1:
|
||||
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
srv.bind((HOST, 0))
|
||||
srv.listen(backlog)
|
||||
srv.close()
|
||||
|
||||
# Issue 15989
|
||||
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
srv.bind((HOST, 0))
|
||||
# backlog = 0
|
||||
srv.listen(0)
|
||||
self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
|
||||
srv.close()
|
||||
|
||||
@unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
|
||||
|
@ -954,6 +961,11 @@ class BasicTCPTest(SocketConnectedTest):
|
|||
|
||||
def _testShutdown(self):
|
||||
self.serv_conn.send(MSG)
|
||||
# Issue 15989
|
||||
self.assertRaises(OverflowError, self.serv_conn.shutdown,
|
||||
_testcapi.INT_MAX + 1)
|
||||
self.assertRaises(OverflowError, self.serv_conn.shutdown,
|
||||
2 + (_testcapi.UINT_MAX + 1))
|
||||
self.serv_conn.shutdown(2)
|
||||
|
||||
def testDetach(self):
|
||||
|
@ -1067,7 +1079,10 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
|||
|
||||
def testSetBlocking(self):
|
||||
# Testing whether set blocking works
|
||||
self.serv.setblocking(0)
|
||||
self.serv.setblocking(True)
|
||||
self.assertIsNone(self.serv.gettimeout())
|
||||
self.serv.setblocking(False)
|
||||
self.assertEqual(self.serv.gettimeout(), 0.0)
|
||||
start = time.time()
|
||||
try:
|
||||
self.serv.accept()
|
||||
|
@ -1075,6 +1090,10 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
|||
pass
|
||||
end = time.time()
|
||||
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
|
||||
# Issue 15989
|
||||
if _testcapi.UINT_MAX < _testcapi.ULONG_MAX:
|
||||
self.serv.setblocking(_testcapi.UINT_MAX + 1)
|
||||
self.assertIsNone(self.serv.gettimeout())
|
||||
|
||||
def _testSetBlocking(self):
|
||||
pass
|
||||
|
|
|
@ -335,7 +335,7 @@ PyCStructUnionType_update_stgdict(PyObject *type, PyObject *fields, int isStruct
|
|||
|
||||
isPacked = PyObject_GetAttrString(type, "_pack_");
|
||||
if (isPacked) {
|
||||
pack = PyLong_AsLong(isPacked);
|
||||
pack = _PyLong_AsInt(isPacked);
|
||||
if (pack < 0 || PyErr_Occurred()) {
|
||||
Py_XDECREF(isPacked);
|
||||
PyErr_SetString(PyExc_ValueError,
|
||||
|
|
|
@ -303,7 +303,8 @@ io_open(PyObject *self, PyObject *args, PyObject *kwds)
|
|||
int text = 0, binary = 0, universal = 0;
|
||||
|
||||
char rawmode[5], *m;
|
||||
int line_buffering, isatty;
|
||||
int line_buffering;
|
||||
long isatty;
|
||||
|
||||
PyObject *raw, *modeobj = NULL, *buffer = NULL, *wrapper = NULL;
|
||||
|
||||
|
@ -441,12 +442,12 @@ io_open(PyObject *self, PyObject *args, PyObject *kwds)
|
|||
#ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
|
||||
{
|
||||
struct stat st;
|
||||
long fileno;
|
||||
int fileno;
|
||||
PyObject *res = PyObject_CallMethod(raw, "fileno", NULL);
|
||||
if (res == NULL)
|
||||
goto error;
|
||||
|
||||
fileno = PyLong_AsLong(res);
|
||||
fileno = _PyLong_AsInt(res);
|
||||
Py_DECREF(res);
|
||||
if (fileno == -1 && PyErr_Occurred())
|
||||
goto error;
|
||||
|
|
|
@ -240,7 +240,7 @@ fileio_init(PyObject *oself, PyObject *args, PyObject *kwds)
|
|||
return -1;
|
||||
}
|
||||
|
||||
fd = PyLong_AsLong(nameobj);
|
||||
fd = _PyLong_AsInt(nameobj);
|
||||
if (fd < 0) {
|
||||
if (!PyErr_Occurred()) {
|
||||
PyErr_SetString(PyExc_ValueError,
|
||||
|
|
|
@ -340,10 +340,13 @@ update_ufd_array(pollObject *self)
|
|||
|
||||
i = pos = 0;
|
||||
while (PyDict_Next(self->dict, &pos, &key, &value)) {
|
||||
self->ufds[i].fd = PyLong_AsLong(key);
|
||||
assert(i < self->ufd_len);
|
||||
/* Never overflow */
|
||||
self->ufds[i].fd = (int)PyLong_AsLong(key);
|
||||
self->ufds[i].events = (short)PyLong_AsLong(value);
|
||||
i++;
|
||||
}
|
||||
assert(i == self->ufd_len);
|
||||
self->ufd_uptodate = 1;
|
||||
return 1;
|
||||
}
|
||||
|
@ -359,10 +362,11 @@ static PyObject *
|
|||
poll_register(pollObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *o, *key, *value;
|
||||
int fd, events = POLLIN | POLLPRI | POLLOUT;
|
||||
int fd;
|
||||
short events = POLLIN | POLLPRI | POLLOUT;
|
||||
int err;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "O|i:register", &o, &events)) {
|
||||
if (!PyArg_ParseTuple(args, "O|h:register", &o, &events)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -501,7 +505,7 @@ poll_poll(pollObject *self, PyObject *args)
|
|||
tout = PyNumber_Long(tout);
|
||||
if (!tout)
|
||||
return NULL;
|
||||
timeout = PyLong_AsLong(tout);
|
||||
timeout = _PyLong_AsInt(tout);
|
||||
Py_DECREF(tout);
|
||||
if (timeout == -1 && PyErr_Occurred())
|
||||
return NULL;
|
||||
|
|
|
@ -1737,7 +1737,7 @@ For IP sockets, the address info is a pair (hostaddr, port).");
|
|||
static PyObject *
|
||||
sock_setblocking(PySocketSockObject *s, PyObject *arg)
|
||||
{
|
||||
int block;
|
||||
long block;
|
||||
|
||||
block = PyLong_AsLong(arg);
|
||||
if (block == -1 && PyErr_Occurred())
|
||||
|
@ -2219,7 +2219,7 @@ sock_listen(PySocketSockObject *s, PyObject *arg)
|
|||
int backlog;
|
||||
int res;
|
||||
|
||||
backlog = PyLong_AsLong(arg);
|
||||
backlog = _PyLong_AsInt(arg);
|
||||
if (backlog == -1 && PyErr_Occurred())
|
||||
return NULL;
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
|
@ -2822,7 +2822,7 @@ sock_shutdown(PySocketSockObject *s, PyObject *arg)
|
|||
int how;
|
||||
int res;
|
||||
|
||||
how = PyLong_AsLong(arg);
|
||||
how = _PyLong_AsInt(arg);
|
||||
if (how == -1 && PyErr_Occurred())
|
||||
return NULL;
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
|
|
|
@ -200,7 +200,7 @@ PyObject_AsFileDescriptor(PyObject *o)
|
|||
PyObject *meth;
|
||||
|
||||
if (PyLong_Check(o)) {
|
||||
fd = PyLong_AsLong(o);
|
||||
fd = _PyLong_AsInt(o);
|
||||
}
|
||||
else if ((meth = PyObject_GetAttrString(o, "fileno")) != NULL)
|
||||
{
|
||||
|
@ -210,7 +210,7 @@ PyObject_AsFileDescriptor(PyObject *o)
|
|||
return -1;
|
||||
|
||||
if (PyLong_Check(fno)) {
|
||||
fd = PyLong_AsLong(fno);
|
||||
fd = _PyLong_AsInt(fno);
|
||||
Py_DECREF(fno);
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -424,6 +424,24 @@ PyLong_AsLong(PyObject *obj)
|
|||
return result;
|
||||
}
|
||||
|
||||
/* Get a C int from a long int object or any object that has an __int__
|
||||
method. Return -1 and set an error if overflow occurs. */
|
||||
|
||||
int
|
||||
_PyLong_AsInt(PyObject *obj)
|
||||
{
|
||||
int overflow;
|
||||
long result = PyLong_AsLongAndOverflow(obj, &overflow);
|
||||
if (overflow || result > INT_MAX || result < INT_MIN) {
|
||||
/* XXX: could be cute and give a different
|
||||
message for overflow == -1 */
|
||||
PyErr_SetString(PyExc_OverflowError,
|
||||
"Python int too large to convert to C int");
|
||||
return -1;
|
||||
}
|
||||
return (int)result;
|
||||
}
|
||||
|
||||
/* Get a Py_ssize_t from a long int object.
|
||||
Returns -1 and sets an error condition if overflow occurs. */
|
||||
|
||||
|
|
|
@ -9640,7 +9640,7 @@ PyObject *PyUnicode_Format(PyObject *format,
|
|||
"* wants int");
|
||||
goto onError;
|
||||
}
|
||||
width = PyLong_AsLong(v);
|
||||
width = PyLong_AsSsize_t(v);
|
||||
if (width == -1 && PyErr_Occurred())
|
||||
goto onError;
|
||||
if (width < 0) {
|
||||
|
@ -9677,7 +9677,7 @@ PyObject *PyUnicode_Format(PyObject *format,
|
|||
"* wants int");
|
||||
goto onError;
|
||||
}
|
||||
prec = PyLong_AsLong(v);
|
||||
prec = _PyLong_AsInt(v);
|
||||
if (prec == -1 && PyErr_Occurred())
|
||||
goto onError;
|
||||
if (prec < 0)
|
||||
|
|
Loading…
Reference in New Issue