2010-10-03 15:26:07 -03:00
|
|
|
import httplib
|
2015-03-22 16:15:44 -03:00
|
|
|
import itertools
|
2009-09-29 14:48:18 -03:00
|
|
|
import array
|
2001-04-13 11:57:44 -03:00
|
|
|
import StringIO
|
2007-03-23 15:54:07 -03:00
|
|
|
import socket
|
2010-07-23 23:46:16 -03:00
|
|
|
import errno
|
2014-11-23 23:02:02 -04:00
|
|
|
import os
|
2015-05-16 12:58:41 -03:00
|
|
|
import tempfile
|
2003-07-08 09:36:58 -03:00
|
|
|
|
2010-01-02 22:06:07 -04:00
|
|
|
import unittest
|
|
|
|
TestCase = unittest.TestCase
|
2004-08-07 13:28:14 -03:00
|
|
|
|
|
|
|
from test import test_support
|
2001-04-13 11:57:44 -03:00
|
|
|
|
2014-11-23 13:42:45 -04:00
|
|
|
here = os.path.dirname(__file__)
|
|
|
|
# Self-signed cert file for 'localhost'
|
|
|
|
CERT_localhost = os.path.join(here, 'keycert.pem')
|
|
|
|
# Self-signed cert file for 'fakehostname'
|
|
|
|
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
|
|
|
|
# Self-signed cert file for self-signed.pythontest.net
|
|
|
|
CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
|
|
|
|
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
HOST = test_support.HOST
|
|
|
|
|
2001-04-13 11:57:44 -03:00
|
|
|
class FakeSocket:
|
2014-05-16 22:51:46 -03:00
|
|
|
def __init__(self, text, fileclass=StringIO.StringIO, host=None, port=None):
|
2001-04-13 11:57:44 -03:00
|
|
|
self.text = text
|
2003-07-08 09:36:58 -03:00
|
|
|
self.fileclass = fileclass
|
2006-11-12 06:32:47 -04:00
|
|
|
self.data = ''
|
2014-12-01 07:07:28 -04:00
|
|
|
self.file_closed = False
|
2014-05-16 22:51:46 -03:00
|
|
|
self.host = host
|
|
|
|
self.port = port
|
2001-04-13 11:57:44 -03:00
|
|
|
|
2004-08-07 13:28:14 -03:00
|
|
|
def sendall(self, data):
|
2009-09-29 14:48:18 -03:00
|
|
|
self.data += ''.join(data)
|
2004-08-07 13:28:14 -03:00
|
|
|
|
2001-04-13 11:57:44 -03:00
|
|
|
def makefile(self, mode, bufsize=None):
|
|
|
|
if mode != 'r' and mode != 'rb':
|
2002-04-01 15:00:50 -04:00
|
|
|
raise httplib.UnimplementedFileMode()
|
2014-12-01 07:07:28 -04:00
|
|
|
# keep the file around so we can check how much was read from it
|
|
|
|
self.file = self.fileclass(self.text)
|
|
|
|
self.file.close = self.file_close #nerf close ()
|
|
|
|
return self.file
|
|
|
|
|
|
|
|
def file_close(self):
|
|
|
|
self.file_closed = True
|
2003-07-08 09:36:58 -03:00
|
|
|
|
2014-05-16 22:51:46 -03:00
|
|
|
def close(self):
|
|
|
|
pass
|
|
|
|
|
2010-07-23 23:46:16 -03:00
|
|
|
class EPipeSocket(FakeSocket):
|
|
|
|
|
|
|
|
def __init__(self, text, pipe_trigger):
|
|
|
|
# When sendall() is called with pipe_trigger, raise EPIPE.
|
|
|
|
FakeSocket.__init__(self, text)
|
|
|
|
self.pipe_trigger = pipe_trigger
|
|
|
|
|
|
|
|
def sendall(self, data):
|
|
|
|
if self.pipe_trigger in data:
|
|
|
|
raise socket.error(errno.EPIPE, "gotcha")
|
|
|
|
self.data += data
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
pass
|
|
|
|
|
2003-07-08 09:36:58 -03:00
|
|
|
class NoEOFStringIO(StringIO.StringIO):
|
|
|
|
"""Like StringIO, but raises AssertionError on EOF.
|
|
|
|
|
|
|
|
This is used below to test that httplib doesn't try to read
|
|
|
|
more from the underlying file than it should.
|
|
|
|
"""
|
|
|
|
def read(self, n=-1):
|
|
|
|
data = StringIO.StringIO.read(self, n)
|
|
|
|
if data == '':
|
|
|
|
raise AssertionError('caller tried to read past EOF')
|
|
|
|
return data
|
|
|
|
|
|
|
|
def readline(self, length=None):
|
|
|
|
data = StringIO.StringIO.readline(self, length)
|
|
|
|
if data == '':
|
|
|
|
raise AssertionError('caller tried to read past EOF')
|
|
|
|
return data
|
2001-04-13 11:57:44 -03:00
|
|
|
|
2004-08-07 13:28:14 -03:00
|
|
|
|
|
|
|
class HeaderTests(TestCase):
|
|
|
|
def test_auto_headers(self):
|
|
|
|
# Some headers are added automatically, but should not be added by
|
|
|
|
# .request() if they are explicitly set.
|
|
|
|
|
|
|
|
class HeaderCountingBuffer(list):
|
|
|
|
def __init__(self):
|
|
|
|
self.count = {}
|
|
|
|
def append(self, item):
|
|
|
|
kv = item.split(':')
|
|
|
|
if len(kv) > 1:
|
|
|
|
# item is a 'Key: Value' header string
|
|
|
|
lcKey = kv[0].lower()
|
|
|
|
self.count.setdefault(lcKey, 0)
|
|
|
|
self.count[lcKey] += 1
|
|
|
|
list.append(self, item)
|
|
|
|
|
|
|
|
for explicit_header in True, False:
|
|
|
|
for header in 'Content-length', 'Host', 'Accept-encoding':
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket('blahblahblah')
|
|
|
|
conn._buffer = HeaderCountingBuffer()
|
|
|
|
|
|
|
|
body = 'spamspamspam'
|
|
|
|
headers = {}
|
|
|
|
if explicit_header:
|
|
|
|
headers[header] = str(len(body))
|
|
|
|
conn.request('POST', '/', body, headers)
|
|
|
|
self.assertEqual(conn._buffer.count[header.lower()], 1)
|
|
|
|
|
2012-05-19 05:52:21 -03:00
|
|
|
def test_content_length_0(self):
|
|
|
|
|
|
|
|
class ContentLengthChecker(list):
|
|
|
|
def __init__(self):
|
|
|
|
list.__init__(self)
|
|
|
|
self.content_length = None
|
|
|
|
def append(self, item):
|
|
|
|
kv = item.split(':', 1)
|
|
|
|
if len(kv) > 1 and kv[0].lower() == 'content-length':
|
|
|
|
self.content_length = kv[1].strip()
|
|
|
|
list.append(self, item)
|
|
|
|
|
2015-03-22 16:15:44 -03:00
|
|
|
# Here, we're testing that methods expecting a body get a
|
|
|
|
# content-length set to zero if the body is empty (either None or '')
|
|
|
|
bodies = (None, '')
|
|
|
|
methods_with_body = ('PUT', 'POST', 'PATCH')
|
|
|
|
for method, body in itertools.product(methods_with_body, bodies):
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket(None)
|
|
|
|
conn._buffer = ContentLengthChecker()
|
|
|
|
conn.request(method, '/', body)
|
|
|
|
self.assertEqual(
|
|
|
|
conn._buffer.content_length, '0',
|
|
|
|
'Header Content-Length incorrect on {}'.format(method)
|
|
|
|
)
|
|
|
|
|
|
|
|
# For these methods, we make sure that content-length is not set when
|
|
|
|
# the body is None because it might cause unexpected behaviour on the
|
|
|
|
# server.
|
|
|
|
methods_without_body = (
|
|
|
|
'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
|
|
|
|
)
|
|
|
|
for method in methods_without_body:
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket(None)
|
|
|
|
conn._buffer = ContentLengthChecker()
|
|
|
|
conn.request(method, '/', None)
|
|
|
|
self.assertEqual(
|
|
|
|
conn._buffer.content_length, None,
|
|
|
|
'Header Content-Length set for empty body on {}'.format(method)
|
|
|
|
)
|
|
|
|
|
|
|
|
# If the body is set to '', that's considered to be "present but
|
|
|
|
# empty" rather than "missing", so content length would be set, even
|
|
|
|
# for methods that don't expect a body.
|
|
|
|
for method in methods_without_body:
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket(None)
|
|
|
|
conn._buffer = ContentLengthChecker()
|
|
|
|
conn.request(method, '/', '')
|
|
|
|
self.assertEqual(
|
|
|
|
conn._buffer.content_length, '0',
|
|
|
|
'Header Content-Length incorrect on {}'.format(method)
|
|
|
|
)
|
|
|
|
|
|
|
|
# If the body is set, make sure Content-Length is set.
|
|
|
|
for method in itertools.chain(methods_without_body, methods_with_body):
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket(None)
|
|
|
|
conn._buffer = ContentLengthChecker()
|
|
|
|
conn.request(method, '/', ' ')
|
|
|
|
self.assertEqual(
|
|
|
|
conn._buffer.content_length, '1',
|
|
|
|
'Header Content-Length incorrect on {}'.format(method)
|
|
|
|
)
|
2012-05-19 05:52:21 -03:00
|
|
|
|
2010-10-03 15:26:07 -03:00
|
|
|
def test_putheader(self):
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket(None)
|
|
|
|
conn.putrequest('GET','/')
|
|
|
|
conn.putheader('Content-length',42)
|
2014-02-08 08:49:55 -04:00
|
|
|
self.assertIn('Content-length: 42', conn._buffer)
|
2010-10-03 15:26:07 -03:00
|
|
|
|
2015-03-12 06:12:51 -03:00
|
|
|
conn.putheader('Foo', ' bar ')
|
|
|
|
self.assertIn(b'Foo: bar ', conn._buffer)
|
|
|
|
conn.putheader('Bar', '\tbaz\t')
|
|
|
|
self.assertIn(b'Bar: \tbaz\t', conn._buffer)
|
|
|
|
conn.putheader('Authorization', 'Bearer mytoken')
|
|
|
|
self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
|
|
|
|
conn.putheader('IterHeader', 'IterA', 'IterB')
|
|
|
|
self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
|
|
|
|
conn.putheader('LatinHeader', b'\xFF')
|
|
|
|
self.assertIn(b'LatinHeader: \xFF', conn._buffer)
|
|
|
|
conn.putheader('Utf8Header', b'\xc3\x80')
|
|
|
|
self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
|
|
|
|
conn.putheader('C1-Control', b'next\x85line')
|
|
|
|
self.assertIn(b'C1-Control: next\x85line', conn._buffer)
|
|
|
|
conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
|
|
|
|
self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
|
|
|
|
conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
|
|
|
|
self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
|
|
|
|
conn.putheader('Key Space', 'value')
|
|
|
|
self.assertIn(b'Key Space: value', conn._buffer)
|
|
|
|
conn.putheader('KeySpace ', 'value')
|
|
|
|
self.assertIn(b'KeySpace : value', conn._buffer)
|
|
|
|
conn.putheader(b'Nonbreak\xa0Space', 'value')
|
|
|
|
self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
|
|
|
|
conn.putheader(b'\xa0NonbreakSpace', 'value')
|
|
|
|
self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
|
|
|
|
|
2010-11-13 23:31:52 -04:00
|
|
|
def test_ipv6host_header(self):
|
2016-05-29 05:13:58 -03:00
|
|
|
# Default host header on IPv6 transaction should be wrapped by [] if
|
|
|
|
# it is an IPv6 address
|
2010-11-13 23:31:52 -04:00
|
|
|
expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
|
|
|
|
'Accept-Encoding: identity\r\n\r\n'
|
|
|
|
conn = httplib.HTTPConnection('[2001::]:81')
|
|
|
|
sock = FakeSocket('')
|
|
|
|
conn.sock = sock
|
|
|
|
conn.request('GET', '/foo')
|
|
|
|
self.assertTrue(sock.data.startswith(expected))
|
|
|
|
|
|
|
|
expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
|
|
|
|
'Accept-Encoding: identity\r\n\r\n'
|
|
|
|
conn = httplib.HTTPConnection('[2001:102A::]')
|
|
|
|
sock = FakeSocket('')
|
|
|
|
conn.sock = sock
|
|
|
|
conn.request('GET', '/foo')
|
|
|
|
self.assertTrue(sock.data.startswith(expected))
|
|
|
|
|
2015-01-26 00:34:42 -04:00
|
|
|
def test_malformed_headers_coped_with(self):
|
|
|
|
# Issue 19996
|
|
|
|
body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
resp.begin()
|
|
|
|
|
|
|
|
self.assertEqual(resp.getheader('First'), 'val')
|
|
|
|
self.assertEqual(resp.getheader('Second'), 'val')
|
|
|
|
|
2016-09-15 23:54:11 -03:00
|
|
|
def test_malformed_truncation(self):
|
|
|
|
# Other malformed header lines, especially without colons, used to
|
|
|
|
# cause the rest of the header section to be truncated
|
|
|
|
resp = (
|
|
|
|
b'HTTP/1.1 200 OK\r\n'
|
|
|
|
b'Public-Key-Pins: \n'
|
|
|
|
b'pin-sha256="xxx=";\n'
|
|
|
|
b'report-uri="https://..."\r\n'
|
|
|
|
b'Transfer-Encoding: chunked\r\n'
|
|
|
|
b'\r\n'
|
|
|
|
b'4\r\nbody\r\n0\r\n\r\n'
|
|
|
|
)
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertIsNotNone(resp.getheader('Public-Key-Pins'))
|
|
|
|
self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
|
|
|
|
self.assertEqual(resp.read(), b'body')
|
|
|
|
|
|
|
|
def test_blank_line_forms(self):
|
|
|
|
# Test that both CRLF and LF blank lines can terminate the header
|
|
|
|
# section and start the body
|
|
|
|
for blank in (b'\r\n', b'\n'):
|
|
|
|
resp = b'HTTP/1.1 200 OK\r\n' b'Transfer-Encoding: chunked\r\n'
|
|
|
|
resp += blank
|
|
|
|
resp += b'4\r\nbody\r\n0\r\n\r\n'
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
|
|
|
|
self.assertEqual(resp.read(), b'body')
|
|
|
|
|
|
|
|
resp = b'HTTP/1.0 200 OK\r\n' + blank + b'body'
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(), b'body')
|
|
|
|
|
|
|
|
# A blank line ending in CR is not treated as the end of the HTTP
|
|
|
|
# header section, therefore header fields following it should be
|
|
|
|
# parsed if possible
|
|
|
|
resp = (
|
|
|
|
b'HTTP/1.1 200 OK\r\n'
|
|
|
|
b'\r'
|
|
|
|
b'Name: value\r\n'
|
|
|
|
b'Transfer-Encoding: chunked\r\n'
|
|
|
|
b'\r\n'
|
|
|
|
b'4\r\nbody\r\n0\r\n\r\n'
|
|
|
|
)
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
|
|
|
|
self.assertEqual(resp.read(), b'body')
|
|
|
|
|
|
|
|
# No header fields nor blank line
|
|
|
|
resp = b'HTTP/1.0 200 OK\r\n'
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(), b'')
|
|
|
|
|
|
|
|
def test_from_line(self):
|
|
|
|
# The parser handles "From" lines specially, so test this does not
|
|
|
|
# affect parsing the rest of the header section
|
|
|
|
resp = (
|
|
|
|
b'HTTP/1.1 200 OK\r\n'
|
|
|
|
b'From start\r\n'
|
|
|
|
b' continued\r\n'
|
|
|
|
b'Name: value\r\n'
|
|
|
|
b'From middle\r\n'
|
|
|
|
b' continued\r\n'
|
|
|
|
b'Transfer-Encoding: chunked\r\n'
|
|
|
|
b'From end\r\n'
|
|
|
|
b'\r\n'
|
|
|
|
b'4\r\nbody\r\n0\r\n\r\n'
|
|
|
|
)
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertIsNotNone(resp.getheader('Name'))
|
|
|
|
self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
|
|
|
|
self.assertEqual(resp.read(), b'body')
|
|
|
|
|
|
|
|
resp = (
|
|
|
|
b'HTTP/1.0 200 OK\r\n'
|
|
|
|
b'From alone\r\n'
|
|
|
|
b'\r\n'
|
|
|
|
b'body'
|
|
|
|
)
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(resp))
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(), b'body')
|
|
|
|
|
|
|
|
def test_parse_all_octets(self):
|
|
|
|
# Ensure no valid header field octet breaks the parser
|
|
|
|
body = (
|
|
|
|
b'HTTP/1.1 200 OK\r\n'
|
|
|
|
b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters
|
|
|
|
b'VCHAR: ' + bytearray(range(0x21, 0x7E + 1)) + b'\r\n'
|
|
|
|
b'obs-text: ' + bytearray(range(0x80, 0xFF + 1)) + b'\r\n'
|
|
|
|
b'obs-fold: text\r\n'
|
|
|
|
b' folded with space\r\n'
|
|
|
|
b'\tfolded with tab\r\n'
|
|
|
|
b'Content-Length: 0\r\n'
|
|
|
|
b'\r\n'
|
|
|
|
)
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.getheader('Content-Length'), '0')
|
|
|
|
self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
|
|
|
|
vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
|
|
|
|
self.assertEqual(resp.getheader('VCHAR'), vchar)
|
|
|
|
self.assertIsNotNone(resp.getheader('obs-text'))
|
|
|
|
folded = resp.getheader('obs-fold')
|
|
|
|
self.assertTrue(folded.startswith('text'))
|
|
|
|
self.assertIn(' folded with space', folded)
|
|
|
|
self.assertTrue(folded.endswith('folded with tab'))
|
|
|
|
|
2015-03-12 06:12:51 -03:00
|
|
|
def test_invalid_headers(self):
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
conn.sock = FakeSocket('')
|
|
|
|
conn.putrequest('GET', '/')
|
|
|
|
|
|
|
|
# http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
|
|
|
|
# longer allowed in header names
|
|
|
|
cases = (
|
|
|
|
(b'Invalid\r\nName', b'ValidValue'),
|
|
|
|
(b'Invalid\rName', b'ValidValue'),
|
|
|
|
(b'Invalid\nName', b'ValidValue'),
|
|
|
|
(b'\r\nInvalidName', b'ValidValue'),
|
|
|
|
(b'\rInvalidName', b'ValidValue'),
|
|
|
|
(b'\nInvalidName', b'ValidValue'),
|
|
|
|
(b' InvalidName', b'ValidValue'),
|
|
|
|
(b'\tInvalidName', b'ValidValue'),
|
|
|
|
(b'Invalid:Name', b'ValidValue'),
|
|
|
|
(b':InvalidName', b'ValidValue'),
|
|
|
|
(b'ValidName', b'Invalid\r\nValue'),
|
|
|
|
(b'ValidName', b'Invalid\rValue'),
|
|
|
|
(b'ValidName', b'Invalid\nValue'),
|
|
|
|
(b'ValidName', b'InvalidValue\r\n'),
|
|
|
|
(b'ValidName', b'InvalidValue\r'),
|
|
|
|
(b'ValidName', b'InvalidValue\n'),
|
|
|
|
)
|
|
|
|
for name, value in cases:
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid header'):
|
|
|
|
conn.putheader(name, value)
|
|
|
|
|
2010-11-13 23:31:52 -04:00
|
|
|
|
2006-10-29 16:24:01 -04:00
|
|
|
class BasicTest(TestCase):
|
|
|
|
def test_status_lines(self):
|
|
|
|
# Test HTTP status lines
|
|
|
|
|
|
|
|
body = "HTTP/1.1 200 Ok\r\n\r\nText"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
2003-01-23 14:02:20 -04:00
|
|
|
resp.begin()
|
2013-12-17 15:49:48 -04:00
|
|
|
self.assertEqual(resp.read(0), '') # Issue #20007
|
|
|
|
self.assertFalse(resp.isclosed())
|
2006-10-29 16:24:01 -04:00
|
|
|
self.assertEqual(resp.read(), 'Text')
|
2007-10-18 00:16:03 -03:00
|
|
|
self.assertTrue(resp.isclosed())
|
2006-10-29 16:24:01 -04:00
|
|
|
|
|
|
|
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
self.assertRaises(httplib.BadStatusLine, resp.begin)
|
|
|
|
|
2010-02-24 00:49:00 -04:00
|
|
|
def test_bad_status_repr(self):
|
|
|
|
exc = httplib.BadStatusLine('')
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
|
2010-02-24 00:49:00 -04:00
|
|
|
|
2007-10-18 00:16:03 -03:00
|
|
|
def test_partial_reads(self):
|
2012-12-15 14:11:54 -04:00
|
|
|
# if we have a length, the system knows when to close itself
|
2007-10-18 00:16:03 -03:00
|
|
|
# same behaviour than when we read the whole thing with read()
|
|
|
|
body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(2), 'Te')
|
|
|
|
self.assertFalse(resp.isclosed())
|
|
|
|
self.assertEqual(resp.read(2), 'xt')
|
|
|
|
self.assertTrue(resp.isclosed())
|
|
|
|
|
2012-12-15 14:11:54 -04:00
|
|
|
def test_partial_reads_no_content_length(self):
|
|
|
|
# when no length is present, the socket should be gracefully closed when
|
|
|
|
# all data was read
|
|
|
|
body = "HTTP/1.1 200 Ok\r\n\r\nText"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(2), 'Te')
|
|
|
|
self.assertFalse(resp.isclosed())
|
|
|
|
self.assertEqual(resp.read(2), 'xt')
|
|
|
|
self.assertEqual(resp.read(1), '')
|
|
|
|
self.assertTrue(resp.isclosed())
|
|
|
|
|
2013-02-02 17:49:34 -04:00
|
|
|
def test_partial_reads_incomplete_body(self):
|
|
|
|
# if the server shuts down the connection before the whole
|
|
|
|
# content-length is delivered, the socket is gracefully closed
|
|
|
|
body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(2), 'Te')
|
|
|
|
self.assertFalse(resp.isclosed())
|
|
|
|
self.assertEqual(resp.read(2), 'xt')
|
|
|
|
self.assertEqual(resp.read(1), '')
|
|
|
|
self.assertTrue(resp.isclosed())
|
|
|
|
|
2006-10-29 16:24:01 -04:00
|
|
|
def test_host_port(self):
|
|
|
|
# Check invalid host_port
|
|
|
|
|
2011-10-18 12:16:00 -03:00
|
|
|
# Note that httplib does not accept user:password@ in the host-port.
|
|
|
|
for hp in ("www.python.org:abc", "user:password@www.python.org"):
|
2006-10-29 16:24:01 -04:00
|
|
|
self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp)
|
|
|
|
|
2008-11-28 20:09:16 -04:00
|
|
|
for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b",
|
|
|
|
8000),
|
2006-10-29 16:24:01 -04:00
|
|
|
("www.python.org:80", "www.python.org", 80),
|
|
|
|
("www.python.org", "www.python.org", 80),
|
2011-10-18 12:16:00 -03:00
|
|
|
("www.python.org:", "www.python.org", 80),
|
2006-10-29 16:24:01 -04:00
|
|
|
("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
|
2004-09-14 18:45:36 -03:00
|
|
|
http = httplib.HTTP(hp)
|
2006-10-29 16:24:01 -04:00
|
|
|
c = http._conn
|
2008-11-28 20:09:16 -04:00
|
|
|
if h != c.host:
|
|
|
|
self.fail("Host incorrectly parsed: %s != %s" % (h, c.host))
|
|
|
|
if p != c.port:
|
|
|
|
self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
|
2006-10-29 16:24:01 -04:00
|
|
|
|
|
|
|
def test_response_headers(self):
|
|
|
|
# test response with multiple message headers with the same field name.
|
|
|
|
text = ('HTTP/1.1 200 OK\r\n'
|
2008-11-28 20:09:16 -04:00
|
|
|
'Set-Cookie: Customer="WILE_E_COYOTE";'
|
|
|
|
' Version="1"; Path="/acme"\r\n'
|
2006-10-29 16:24:01 -04:00
|
|
|
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
|
|
|
|
' Path="/acme"\r\n'
|
|
|
|
'\r\n'
|
|
|
|
'No body\r\n')
|
|
|
|
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
|
|
|
|
', '
|
|
|
|
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
|
|
|
|
s = FakeSocket(text)
|
|
|
|
r = httplib.HTTPResponse(s)
|
|
|
|
r.begin()
|
|
|
|
cookies = r.getheader("Set-Cookie")
|
|
|
|
if cookies != hdr:
|
|
|
|
self.fail("multiple headers not combined properly")
|
|
|
|
|
|
|
|
def test_read_head(self):
|
|
|
|
# Test that the library doesn't attempt to read any data
|
|
|
|
# from a HEAD request. (Tickles SF bug #622042.)
|
|
|
|
sock = FakeSocket(
|
|
|
|
'HTTP/1.1 200 OK\r\n'
|
|
|
|
'Content-Length: 14432\r\n'
|
|
|
|
'\r\n',
|
|
|
|
NoEOFStringIO)
|
|
|
|
resp = httplib.HTTPResponse(sock, method="HEAD")
|
|
|
|
resp.begin()
|
|
|
|
if resp.read() != "":
|
|
|
|
self.fail("Did not expect response from HEAD request")
|
2003-05-05 13:13:58 -03:00
|
|
|
|
2014-08-05 01:15:57 -03:00
|
|
|
def test_too_many_headers(self):
|
|
|
|
headers = '\r\n'.join('Header%d: foo' % i for i in xrange(200)) + '\r\n'
|
|
|
|
text = ('HTTP/1.1 200 OK\r\n' + headers)
|
|
|
|
s = FakeSocket(text)
|
|
|
|
r = httplib.HTTPResponse(s)
|
|
|
|
self.assertRaises(httplib.HTTPException, r.begin)
|
|
|
|
|
2006-11-12 06:32:47 -04:00
|
|
|
def test_send_file(self):
|
|
|
|
expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
|
|
|
|
'Accept-Encoding: identity\r\nContent-Length:'
|
|
|
|
|
|
|
|
body = open(__file__, 'rb')
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
conn.sock = sock
|
|
|
|
conn.request('GET', '/foo', body)
|
|
|
|
self.assertTrue(sock.data.startswith(expected))
|
2015-05-16 12:58:41 -03:00
|
|
|
self.assertIn('def test_send_file', sock.data)
|
|
|
|
|
|
|
|
def test_send_tempfile(self):
|
|
|
|
expected = ('GET /foo HTTP/1.1\r\nHost: example.com\r\n'
|
|
|
|
'Accept-Encoding: identity\r\nContent-Length: 9\r\n\r\n'
|
|
|
|
'fake\ndata')
|
|
|
|
|
|
|
|
with tempfile.TemporaryFile() as body:
|
|
|
|
body.write('fake\ndata')
|
|
|
|
body.seek(0)
|
|
|
|
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
conn.sock = sock
|
|
|
|
conn.request('GET', '/foo', body)
|
|
|
|
self.assertEqual(sock.data, expected)
|
2004-08-07 13:28:14 -03:00
|
|
|
|
2009-09-29 14:48:18 -03:00
|
|
|
def test_send(self):
|
|
|
|
expected = 'this is a test this is only a test'
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
sock = FakeSocket(None)
|
|
|
|
conn.sock = sock
|
|
|
|
conn.send(expected)
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(expected, sock.data)
|
2009-09-29 14:48:18 -03:00
|
|
|
sock.data = ''
|
|
|
|
conn.send(array.array('c', expected))
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(expected, sock.data)
|
2009-09-29 14:48:18 -03:00
|
|
|
sock.data = ''
|
|
|
|
conn.send(StringIO.StringIO(expected))
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(expected, sock.data)
|
2009-09-29 14:48:18 -03:00
|
|
|
|
2008-02-23 20:03:22 -04:00
|
|
|
def test_chunked(self):
|
|
|
|
chunked_start = (
|
|
|
|
'HTTP/1.1 200 OK\r\n'
|
|
|
|
'Transfer-Encoding: chunked\r\n\r\n'
|
|
|
|
'a\r\n'
|
|
|
|
'hello worl\r\n'
|
|
|
|
'1\r\n'
|
|
|
|
'd\r\n'
|
|
|
|
)
|
|
|
|
sock = FakeSocket(chunked_start + '0\r\n')
|
|
|
|
resp = httplib.HTTPResponse(sock, method="GET")
|
|
|
|
resp.begin()
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(resp.read(), 'hello world')
|
2008-02-23 20:03:22 -04:00
|
|
|
resp.close()
|
|
|
|
|
|
|
|
for x in ('', 'foo\r\n'):
|
|
|
|
sock = FakeSocket(chunked_start + x)
|
|
|
|
resp = httplib.HTTPResponse(sock, method="GET")
|
|
|
|
resp.begin()
|
|
|
|
try:
|
|
|
|
resp.read()
|
|
|
|
except httplib.IncompleteRead, i:
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(i.partial, 'hello world')
|
2009-03-02 18:41:42 -04:00
|
|
|
self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
|
|
|
|
self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
|
2008-02-23 20:03:22 -04:00
|
|
|
else:
|
|
|
|
self.fail('IncompleteRead expected')
|
|
|
|
finally:
|
|
|
|
resp.close()
|
|
|
|
|
2010-04-28 14:20:43 -03:00
|
|
|
def test_chunked_head(self):
|
|
|
|
chunked_start = (
|
|
|
|
'HTTP/1.1 200 OK\r\n'
|
|
|
|
'Transfer-Encoding: chunked\r\n\r\n'
|
|
|
|
'a\r\n'
|
|
|
|
'hello world\r\n'
|
|
|
|
'1\r\n'
|
|
|
|
'd\r\n'
|
|
|
|
)
|
|
|
|
sock = FakeSocket(chunked_start + '0\r\n')
|
|
|
|
resp = httplib.HTTPResponse(sock, method="HEAD")
|
|
|
|
resp.begin()
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(resp.read(), '')
|
|
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
self.assertEqual(resp.reason, 'OK')
|
2010-06-04 14:17:09 -03:00
|
|
|
self.assertTrue(resp.isclosed())
|
2010-04-28 14:20:43 -03:00
|
|
|
|
2008-02-23 20:14:24 -04:00
|
|
|
def test_negative_content_length(self):
|
2008-11-28 20:09:16 -04:00
|
|
|
sock = FakeSocket('HTTP/1.1 200 OK\r\n'
|
|
|
|
'Content-Length: -1\r\n\r\nHello\r\n')
|
2008-02-23 20:14:24 -04:00
|
|
|
resp = httplib.HTTPResponse(sock, method="GET")
|
|
|
|
resp.begin()
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(resp.read(), 'Hello\r\n')
|
2013-02-02 17:49:34 -04:00
|
|
|
self.assertTrue(resp.isclosed())
|
2008-02-23 20:14:24 -04:00
|
|
|
|
2009-03-02 18:41:42 -04:00
|
|
|
def test_incomplete_read(self):
|
|
|
|
sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
|
|
|
|
resp = httplib.HTTPResponse(sock, method="GET")
|
|
|
|
resp.begin()
|
|
|
|
try:
|
|
|
|
resp.read()
|
|
|
|
except httplib.IncompleteRead as i:
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(i.partial, 'Hello\r\n')
|
2009-03-02 18:41:42 -04:00
|
|
|
self.assertEqual(repr(i),
|
|
|
|
"IncompleteRead(7 bytes read, 3 more expected)")
|
|
|
|
self.assertEqual(str(i),
|
|
|
|
"IncompleteRead(7 bytes read, 3 more expected)")
|
2013-02-02 17:49:34 -04:00
|
|
|
self.assertTrue(resp.isclosed())
|
2009-03-02 18:41:42 -04:00
|
|
|
else:
|
|
|
|
self.fail('IncompleteRead expected')
|
|
|
|
|
2010-07-23 23:46:16 -03:00
|
|
|
def test_epipe(self):
|
|
|
|
sock = EPipeSocket(
|
|
|
|
"HTTP/1.0 401 Authorization Required\r\n"
|
|
|
|
"Content-type: text/html\r\n"
|
|
|
|
"WWW-Authenticate: Basic realm=\"example\"\r\n",
|
|
|
|
b"Content-Length")
|
|
|
|
conn = httplib.HTTPConnection("example.com")
|
|
|
|
conn.sock = sock
|
|
|
|
self.assertRaises(socket.error,
|
|
|
|
lambda: conn.request("PUT", "/url", "body"))
|
|
|
|
resp = conn.getresponse()
|
|
|
|
self.assertEqual(401, resp.status)
|
|
|
|
self.assertEqual("Basic realm=\"example\"",
|
|
|
|
resp.getheader("www-authenticate"))
|
|
|
|
|
2010-09-20 22:38:15 -03:00
|
|
|
def test_filenoattr(self):
|
|
|
|
# Just test the fileno attribute in the HTTPResponse Object.
|
|
|
|
body = "HTTP/1.1 200 Ok\r\n\r\nText"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
self.assertTrue(hasattr(resp,'fileno'),
|
|
|
|
'HTTPResponse should expose a fileno attribute')
|
2008-02-23 20:03:22 -04:00
|
|
|
|
2016-09-15 23:54:11 -03:00
|
|
|
# Test lines overflowing the max line size (_MAXLINE in httplib)
|
2010-12-18 14:18:21 -04:00
|
|
|
|
|
|
|
def test_overflowing_status_line(self):
|
|
|
|
self.skipTest("disabled for HTTP 0.9 support")
|
|
|
|
body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(body))
|
|
|
|
self.assertRaises((httplib.LineTooLong, httplib.BadStatusLine), resp.begin)
|
|
|
|
|
|
|
|
def test_overflowing_header_line(self):
|
|
|
|
body = (
|
|
|
|
'HTTP/1.1 200 OK\r\n'
|
|
|
|
'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
|
|
|
|
)
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(body))
|
|
|
|
self.assertRaises(httplib.LineTooLong, resp.begin)
|
|
|
|
|
|
|
|
def test_overflowing_chunked_line(self):
|
|
|
|
body = (
|
|
|
|
'HTTP/1.1 200 OK\r\n'
|
|
|
|
'Transfer-Encoding: chunked\r\n\r\n'
|
|
|
|
+ '0' * 65536 + 'a\r\n'
|
|
|
|
'hello world\r\n'
|
|
|
|
'0\r\n'
|
|
|
|
)
|
|
|
|
resp = httplib.HTTPResponse(FakeSocket(body))
|
|
|
|
resp.begin()
|
|
|
|
self.assertRaises(httplib.LineTooLong, resp.read)
|
|
|
|
|
2012-04-28 23:15:31 -03:00
|
|
|
def test_early_eof(self):
|
|
|
|
# Test httpresponse with no \r\n termination,
|
|
|
|
body = "HTTP/1.1 200 Ok"
|
|
|
|
sock = FakeSocket(body)
|
|
|
|
resp = httplib.HTTPResponse(sock)
|
|
|
|
resp.begin()
|
|
|
|
self.assertEqual(resp.read(), '')
|
|
|
|
self.assertTrue(resp.isclosed())
|
2010-12-18 14:18:21 -04:00
|
|
|
|
2014-12-01 07:07:28 -04:00
|
|
|
def test_error_leak(self):
|
|
|
|
# Test that the socket is not leaked if getresponse() fails
|
|
|
|
conn = httplib.HTTPConnection('example.com')
|
|
|
|
response = []
|
|
|
|
class Response(httplib.HTTPResponse):
|
|
|
|
def __init__(self, *pos, **kw):
|
|
|
|
response.append(self) # Avoid garbage collector closing the socket
|
|
|
|
httplib.HTTPResponse.__init__(self, *pos, **kw)
|
|
|
|
conn.response_class = Response
|
|
|
|
conn.sock = FakeSocket('') # Emulate server dropping connection
|
|
|
|
conn.request('GET', '/')
|
|
|
|
self.assertRaises(httplib.BadStatusLine, conn.getresponse)
|
|
|
|
self.assertTrue(response)
|
|
|
|
#self.assertTrue(response[0].closed)
|
|
|
|
self.assertTrue(conn.sock.file_closed)
|
|
|
|
|
2015-09-06 22:18:47 -03:00
|
|
|
def test_proxy_tunnel_without_status_line(self):
|
|
|
|
# Issue 17849: If a proxy tunnel is created that does not return
|
|
|
|
# a status code, fail.
|
|
|
|
body = 'hello world'
|
|
|
|
conn = httplib.HTTPConnection('example.com', strict=False)
|
|
|
|
conn.set_tunnel('foo')
|
|
|
|
conn.sock = FakeSocket(body)
|
|
|
|
with self.assertRaisesRegexp(socket.error, "Invalid response"):
|
|
|
|
conn._tunnel()
|
|
|
|
|
2006-02-17 18:01:08 -04:00
|
|
|
class OfflineTest(TestCase):
|
|
|
|
def test_responses(self):
|
2010-11-21 09:34:58 -04:00
|
|
|
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
|
2006-02-17 18:01:08 -04:00
|
|
|
|
2010-01-02 22:06:07 -04:00
|
|
|
|
2015-01-24 16:58:10 -04:00
|
|
|
class TestServerMixin:
|
|
|
|
"""A limited socket server mixin.
|
|
|
|
|
|
|
|
This is used by test cases for testing http connection end points.
|
|
|
|
"""
|
2010-01-02 22:06:07 -04:00
|
|
|
def setUp(self):
|
|
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
self.port = test_support.bind_port(self.serv)
|
|
|
|
self.source_port = test_support.find_unused_port()
|
|
|
|
self.serv.listen(5)
|
|
|
|
self.conn = None
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
if self.conn:
|
|
|
|
self.conn.close()
|
|
|
|
self.conn = None
|
|
|
|
self.serv.close()
|
|
|
|
self.serv = None
|
|
|
|
|
2015-01-24 16:58:10 -04:00
|
|
|
class SourceAddressTest(TestServerMixin, TestCase):
|
2010-01-02 22:06:07 -04:00
|
|
|
def testHTTPConnectionSourceAddress(self):
|
|
|
|
self.conn = httplib.HTTPConnection(HOST, self.port,
|
|
|
|
source_address=('', self.source_port))
|
|
|
|
self.conn.connect()
|
|
|
|
self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
|
|
|
|
|
|
|
|
@unittest.skipIf(not hasattr(httplib, 'HTTPSConnection'),
|
|
|
|
'httplib.HTTPSConnection not defined')
|
|
|
|
def testHTTPSConnectionSourceAddress(self):
|
|
|
|
self.conn = httplib.HTTPSConnection(HOST, self.port,
|
|
|
|
source_address=('', self.source_port))
|
2016-10-09 22:00:00 -03:00
|
|
|
# We don't test anything here other than the constructor not barfing as
|
2010-01-02 22:06:07 -04:00
|
|
|
# this code doesn't deal with setting up an active running SSL server
|
|
|
|
# for an ssl_wrapped connect() to actually return from.
|
|
|
|
|
|
|
|
|
2015-01-24 16:58:10 -04:00
|
|
|
class HTTPTest(TestServerMixin, TestCase):
|
|
|
|
def testHTTPConnection(self):
|
|
|
|
self.conn = httplib.HTTP(host=HOST, port=self.port, strict=None)
|
|
|
|
self.conn.connect()
|
|
|
|
self.assertEqual(self.conn._conn.host, HOST)
|
|
|
|
self.assertEqual(self.conn._conn.port, self.port)
|
|
|
|
|
|
|
|
def testHTTPWithConnectHostPort(self):
|
|
|
|
testhost = 'unreachable.test.domain'
|
|
|
|
testport = '80'
|
|
|
|
self.conn = httplib.HTTP(host=testhost, port=testport)
|
|
|
|
self.conn.connect(host=HOST, port=self.port)
|
|
|
|
self.assertNotEqual(self.conn._conn.host, testhost)
|
|
|
|
self.assertNotEqual(self.conn._conn.port, testport)
|
|
|
|
self.assertEqual(self.conn._conn.host, HOST)
|
|
|
|
self.assertEqual(self.conn._conn.port, self.port)
|
|
|
|
|
|
|
|
|
2007-03-23 15:54:07 -03:00
|
|
|
class TimeoutTest(TestCase):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
PORT = None
|
2007-04-25 03:30:05 -03:00
|
|
|
|
2007-03-23 15:54:07 -03:00
|
|
|
def setUp(self):
|
|
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
2008-04-08 21:34:53 -03:00
|
|
|
TimeoutTest.PORT = test_support.bind_port(self.serv)
|
2007-03-25 00:20:05 -03:00
|
|
|
self.serv.listen(5)
|
2007-03-23 15:54:07 -03:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.serv.close()
|
|
|
|
self.serv = None
|
|
|
|
|
|
|
|
def testTimeoutAttribute(self):
|
|
|
|
'''This will prove that the timeout gets through
|
|
|
|
HTTPConnection and into the socket.
|
|
|
|
'''
|
2008-05-29 13:39:26 -03:00
|
|
|
# default -- use global socket timeout
|
2014-02-08 08:49:55 -04:00
|
|
|
self.assertIsNone(socket.getdefaulttimeout())
|
2008-05-29 13:39:26 -03:00
|
|
|
socket.setdefaulttimeout(30)
|
|
|
|
try:
|
|
|
|
httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
|
|
|
|
httpConn.connect()
|
|
|
|
finally:
|
|
|
|
socket.setdefaulttimeout(None)
|
2007-03-23 17:23:08 -03:00
|
|
|
self.assertEqual(httpConn.sock.gettimeout(), 30)
|
2007-03-25 00:20:05 -03:00
|
|
|
httpConn.close()
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2008-05-29 13:39:26 -03:00
|
|
|
# no timeout -- do not use global socket default
|
2014-02-08 08:49:55 -04:00
|
|
|
self.assertIsNone(socket.getdefaulttimeout())
|
2007-03-23 17:23:08 -03:00
|
|
|
socket.setdefaulttimeout(30)
|
|
|
|
try:
|
2008-04-08 21:34:53 -03:00
|
|
|
httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
|
|
|
|
timeout=None)
|
2007-03-23 17:23:08 -03:00
|
|
|
httpConn.connect()
|
|
|
|
finally:
|
2008-05-29 13:39:26 -03:00
|
|
|
socket.setdefaulttimeout(None)
|
|
|
|
self.assertEqual(httpConn.sock.gettimeout(), None)
|
|
|
|
httpConn.close()
|
|
|
|
|
|
|
|
# a value
|
|
|
|
httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
|
|
|
|
httpConn.connect()
|
2007-03-23 17:23:08 -03:00
|
|
|
self.assertEqual(httpConn.sock.gettimeout(), 30)
|
2007-03-25 00:20:05 -03:00
|
|
|
httpConn.close()
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2015-01-24 16:58:10 -04:00
|
|
|
|
2014-11-23 13:42:45 -04:00
|
|
|
class HTTPSTest(TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
if not hasattr(httplib, 'HTTPSConnection'):
|
|
|
|
self.skipTest('ssl support required')
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2014-11-23 13:42:45 -04:00
|
|
|
def make_server(self, certfile):
|
|
|
|
from test.ssl_servers import make_https_server
|
|
|
|
return make_https_server(self, certfile=certfile)
|
2007-05-21 14:32:32 -03:00
|
|
|
|
|
|
|
def test_attributes(self):
|
2014-11-23 13:42:45 -04:00
|
|
|
# simple test to check it's storing the timeout
|
|
|
|
h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
|
|
|
|
self.assertEqual(h.timeout, 30)
|
|
|
|
|
|
|
|
def test_networked(self):
|
|
|
|
# Default settings: requires a valid cert from a trusted CA
|
|
|
|
import ssl
|
|
|
|
test_support.requires('network')
|
|
|
|
with test_support.transient_internet('self-signed.pythontest.net'):
|
|
|
|
h = httplib.HTTPSConnection('self-signed.pythontest.net', 443)
|
|
|
|
with self.assertRaises(ssl.SSLError) as exc_info:
|
|
|
|
h.request('GET', '/')
|
|
|
|
self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
|
|
|
|
|
|
|
|
def test_networked_noverification(self):
|
|
|
|
# Switch off cert verification
|
|
|
|
import ssl
|
|
|
|
test_support.requires('network')
|
|
|
|
with test_support.transient_internet('self-signed.pythontest.net'):
|
|
|
|
context = ssl._create_stdlib_context()
|
|
|
|
h = httplib.HTTPSConnection('self-signed.pythontest.net', 443,
|
|
|
|
context=context)
|
|
|
|
h.request('GET', '/')
|
|
|
|
resp = h.getresponse()
|
|
|
|
self.assertIn('nginx', resp.getheader('server'))
|
|
|
|
|
2014-11-25 17:16:55 -04:00
|
|
|
@test_support.system_must_validate_cert
|
2014-11-23 13:42:45 -04:00
|
|
|
def test_networked_trusted_by_default_cert(self):
|
|
|
|
# Default settings: requires a valid cert from a trusted CA
|
|
|
|
test_support.requires('network')
|
|
|
|
with test_support.transient_internet('www.python.org'):
|
|
|
|
h = httplib.HTTPSConnection('www.python.org', 443)
|
|
|
|
h.request('GET', '/')
|
|
|
|
resp = h.getresponse()
|
|
|
|
content_type = resp.getheader('content-type')
|
|
|
|
self.assertIn('text/html', content_type)
|
|
|
|
|
|
|
|
def test_networked_good_cert(self):
|
|
|
|
# We feed the server's cert as a validating cert
|
|
|
|
import ssl
|
|
|
|
test_support.requires('network')
|
|
|
|
with test_support.transient_internet('self-signed.pythontest.net'):
|
|
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
|
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
|
|
context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
|
|
|
|
h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
|
|
|
|
h.request('GET', '/')
|
|
|
|
resp = h.getresponse()
|
|
|
|
server_string = resp.getheader('server')
|
|
|
|
self.assertIn('nginx', server_string)
|
|
|
|
|
|
|
|
def test_networked_bad_cert(self):
|
|
|
|
# We feed a "CA" cert that is unrelated to the server's cert
|
|
|
|
import ssl
|
|
|
|
test_support.requires('network')
|
|
|
|
with test_support.transient_internet('self-signed.pythontest.net'):
|
|
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
|
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
|
|
context.load_verify_locations(CERT_localhost)
|
|
|
|
h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
|
|
|
|
with self.assertRaises(ssl.SSLError) as exc_info:
|
|
|
|
h.request('GET', '/')
|
|
|
|
self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
|
|
|
|
|
|
|
|
def test_local_unknown_cert(self):
|
|
|
|
# The custom cert isn't known to the default trust bundle
|
|
|
|
import ssl
|
|
|
|
server = self.make_server(CERT_localhost)
|
|
|
|
h = httplib.HTTPSConnection('localhost', server.port)
|
|
|
|
with self.assertRaises(ssl.SSLError) as exc_info:
|
|
|
|
h.request('GET', '/')
|
|
|
|
self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
|
|
|
|
|
|
|
|
def test_local_good_hostname(self):
|
|
|
|
# The (valid) cert validates the HTTP hostname
|
|
|
|
import ssl
|
|
|
|
server = self.make_server(CERT_localhost)
|
|
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
|
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
|
|
context.load_verify_locations(CERT_localhost)
|
|
|
|
h = httplib.HTTPSConnection('localhost', server.port, context=context)
|
|
|
|
h.request('GET', '/nonexistent')
|
|
|
|
resp = h.getresponse()
|
|
|
|
self.assertEqual(resp.status, 404)
|
|
|
|
|
|
|
|
def test_local_bad_hostname(self):
|
|
|
|
# The (valid) cert doesn't validate the HTTP hostname
|
|
|
|
import ssl
|
|
|
|
server = self.make_server(CERT_fakehostname)
|
|
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
|
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
2014-12-07 14:41:26 -04:00
|
|
|
context.check_hostname = True
|
2014-11-23 13:42:45 -04:00
|
|
|
context.load_verify_locations(CERT_fakehostname)
|
|
|
|
h = httplib.HTTPSConnection('localhost', server.port, context=context)
|
|
|
|
with self.assertRaises(ssl.CertificateError):
|
|
|
|
h.request('GET', '/')
|
2014-12-07 14:41:26 -04:00
|
|
|
h.close()
|
|
|
|
# With context.check_hostname=False, the mismatching is ignored
|
|
|
|
context.check_hostname = False
|
|
|
|
h = httplib.HTTPSConnection('localhost', server.port, context=context)
|
2014-11-23 13:42:45 -04:00
|
|
|
h.request('GET', '/nonexistent')
|
|
|
|
resp = h.getresponse()
|
|
|
|
self.assertEqual(resp.status, 404)
|
2007-05-21 14:32:32 -03:00
|
|
|
|
2011-10-18 12:16:00 -03:00
|
|
|
def test_host_port(self):
|
|
|
|
# Check invalid host_port
|
|
|
|
|
|
|
|
for hp in ("www.python.org:abc", "user:password@www.python.org"):
|
2014-11-23 13:42:45 -04:00
|
|
|
self.assertRaises(httplib.InvalidURL, httplib.HTTPSConnection, hp)
|
2011-10-18 12:16:00 -03:00
|
|
|
|
2014-11-23 13:42:45 -04:00
|
|
|
for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
|
|
|
|
"fe80::207:e9ff:fe9b", 8000),
|
|
|
|
("www.python.org:443", "www.python.org", 443),
|
|
|
|
("www.python.org:", "www.python.org", 443),
|
|
|
|
("www.python.org", "www.python.org", 443),
|
|
|
|
("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
|
|
|
|
("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
|
|
|
|
443)):
|
|
|
|
c = httplib.HTTPSConnection(hp)
|
|
|
|
self.assertEqual(h, c.host)
|
|
|
|
self.assertEqual(p, c.port)
|
2011-10-18 12:16:00 -03:00
|
|
|
|
|
|
|
|
2014-05-16 22:51:46 -03:00
|
|
|
class TunnelTests(TestCase):
|
|
|
|
def test_connect(self):
|
|
|
|
response_text = (
|
|
|
|
'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
|
|
|
|
'HTTP/1.1 200 OK\r\n' # Reply to HEAD
|
|
|
|
'Content-Length: 42\r\n\r\n'
|
|
|
|
)
|
|
|
|
|
|
|
|
def create_connection(address, timeout=None, source_address=None):
|
|
|
|
return FakeSocket(response_text, host=address[0], port=address[1])
|
|
|
|
|
|
|
|
conn = httplib.HTTPConnection('proxy.com')
|
|
|
|
conn._create_connection = create_connection
|
|
|
|
|
|
|
|
# Once connected, we should not be able to tunnel anymore
|
|
|
|
conn.connect()
|
|
|
|
self.assertRaises(RuntimeError, conn.set_tunnel, 'destination.com')
|
|
|
|
|
|
|
|
# But if close the connection, we are good.
|
|
|
|
conn.close()
|
|
|
|
conn.set_tunnel('destination.com')
|
|
|
|
conn.request('HEAD', '/', '')
|
|
|
|
|
|
|
|
self.assertEqual(conn.sock.host, 'proxy.com')
|
|
|
|
self.assertEqual(conn.sock.port, 80)
|
2015-05-28 16:37:13 -03:00
|
|
|
self.assertIn('CONNECT destination.com', conn.sock.data)
|
|
|
|
# issue22095
|
|
|
|
self.assertNotIn('Host: destination.com:None', conn.sock.data)
|
|
|
|
self.assertIn('Host: destination.com', conn.sock.data)
|
2014-05-16 22:51:46 -03:00
|
|
|
|
2015-05-28 16:37:13 -03:00
|
|
|
self.assertNotIn('Host: proxy.com', conn.sock.data)
|
2014-05-16 22:51:46 -03:00
|
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
conn.request('PUT', '/', '')
|
|
|
|
self.assertEqual(conn.sock.host, 'proxy.com')
|
|
|
|
self.assertEqual(conn.sock.port, 80)
|
|
|
|
self.assertTrue('CONNECT destination.com' in conn.sock.data)
|
|
|
|
self.assertTrue('Host: destination.com' in conn.sock.data)
|
|
|
|
|
|
|
|
|
2014-11-23 13:42:45 -04:00
|
|
|
@test_support.reap_threads
|
2004-08-07 13:28:14 -03:00
|
|
|
def test_main(verbose=None):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
|
2015-01-24 16:58:10 -04:00
|
|
|
HTTPTest, HTTPSTest, SourceAddressTest,
|
|
|
|
TunnelTests)
|
2004-08-07 13:28:14 -03:00
|
|
|
|
2006-10-29 16:24:01 -04:00
|
|
|
if __name__ == '__main__':
|
|
|
|
test_main()
|