From e159422ce9cd6cedff7b45eef5a00d7e6a2aa90c Mon Sep 17 00:00:00 2001 From: "Phillip J. Eby" Date: Tue, 2 Nov 2010 22:28:59 +0000 Subject: [PATCH] Update wsgiref for PEP 3333, and fix errors introduced into the test suite by converting type() checks to isinstance(). (When WSGI specifies a built-in type, it does NOT mean "this type or a subclass" -- it means 'type(x) is SpecifiedType'.) --- Lib/test/test_wsgiref.py | 47 ++++++++++---------------------------- Lib/wsgiref/handlers.py | 49 +++++++++++++++++----------------------- Lib/wsgiref/headers.py | 22 ++++++++++-------- Lib/wsgiref/validate.py | 42 ++++++++++++++++++---------------- 4 files changed, 67 insertions(+), 93 deletions(-) diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index b78d0c8adb1..49d372d6c66 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -47,7 +47,7 @@ def hello_app(environ,start_response): ('Content-Type','text/plain'), ('Date','Mon, 05 Jun 2006 18:49:54 GMT') ]) - return ["Hello, world!"] + return [b"Hello, world!"] def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): server = make_server("", 80, app, MockServer, MockHandler) @@ -165,7 +165,7 @@ class IntegrationTests(TestCase): def test_wsgi_input(self): def bad_app(e,s): e["wsgi.input"].read() - s(b"200 OK", [(b"Content-Type", b"text/plain; charset=utf-8")]) + s("200 OK", [("Content-Type", "text/plain; charset=utf-8")]) return [b"data"] out, err = run_amock(validator(bad_app)) self.assertTrue(out.endswith( @@ -177,8 +177,8 @@ class IntegrationTests(TestCase): def test_bytes_validation(self): def app(e, s): - s(b"200 OK", [ - (b"Content-Type", b"text/plain; charset=utf-8"), + s("200 OK", [ + ("Content-Type", "text/plain; charset=utf-8"), ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), ]) return [b"data"] @@ -420,29 +420,6 @@ class HeaderTests(TestCase): '\r\n' ) - def testBytes(self): - h = Headers([ - (b"Content-Type", b"text/plain; charset=utf-8"), - ]) - self.assertEqual("text/plain; charset=utf-8", h.get("Content-Type")) - - h[b"Foo"] = bytes(b"bar") - self.assertEqual("bar", h.get("Foo")) - self.assertEqual("bar", h.get(b"Foo")) - - h.setdefault(b"Bar", b"foo") - self.assertEqual("foo", h.get("Bar")) - self.assertEqual("foo", h.get(b"Bar")) - - h.add_header(b'content-disposition', b'attachment', - filename=b'bud.gif') - self.assertEqual('attachment; filename="bud.gif"', - h.get("content-disposition")) - - del h['content-disposition'] - self.assertNotIn(b'content-disposition', h) - - class ErrorHandler(BaseCGIHandler): """Simple handler subclass for testing BaseHandler""" @@ -529,10 +506,10 @@ class HandlerTests(TestCase): def trivial_app1(e,s): s('200 OK',[]) - return [e['wsgi.url_scheme']] + return [e['wsgi.url_scheme'].encode('iso-8859-1')] def trivial_app2(e,s): - s('200 OK',[])(e['wsgi.url_scheme']) + s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1')) return [] def trivial_app3(e,s): @@ -590,13 +567,13 @@ class HandlerTests(TestCase): ("Status: %s\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" - "\r\n%s" % (h.error_status,len(h.error_body),h.error_body) - ).encode("iso-8859-1")) + "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1') + + h.error_body) self.assertIn("AssertionError", h.stderr.getvalue()) def testErrorAfterOutput(self): - MSG = "Some output has been sent" + MSG = b"Some output has been sent" def error_app(e,s): s("200 OK",[])(MSG) raise AssertionError("This should be caught by handler") @@ -605,7 +582,7 @@ class HandlerTests(TestCase): h.run(error_app) self.assertEqual(h.stdout.getvalue(), ("Status: 200 OK\r\n" - "\r\n"+MSG).encode("iso-8859-1")) + "\r\n".encode("iso-8859-1")+MSG)) self.assertIn("AssertionError", h.stderr.getvalue()) @@ -654,8 +631,8 @@ class HandlerTests(TestCase): def testBytesData(self): def app(e, s): - s(b"200 OK", [ - (b"Content-Type", b"text/plain; charset=utf-8"), + s("200 OK", [ + ("Content-Type", "text/plain; charset=utf-8"), ]) return [b"data"] diff --git a/Lib/wsgiref/handlers.py b/Lib/wsgiref/handlers.py index a87c32ce851..3e112190959 100644 --- a/Lib/wsgiref/handlers.py +++ b/Lib/wsgiref/handlers.py @@ -46,7 +46,7 @@ class BaseHandler: traceback_limit = None # Print entire traceback to self.get_stderr() error_status = "500 Internal Server Error" error_headers = [('Content-Type','text/plain')] - error_body = "A server error occurred. Please contact the administrator." + error_body = b"A server error occurred. Please contact the administrator." # State variables (don't mess with these) status = result = None @@ -137,7 +137,7 @@ class BaseHandler: self.set_content_length() def start_response(self, status, headers,exc_info=None): - """'start_response()' callable as specified by PEP 333""" + """'start_response()' callable as specified by PEP 3333""" if exc_info: try: @@ -149,49 +149,48 @@ class BaseHandler: elif self.headers is not None: raise AssertionError("Headers already set!") + self.status = status + self.headers = self.headers_class(headers) status = self._convert_string_type(status, "Status") assert len(status)>=4,"Status must be at least 4 characters" assert int(status[:3]),"Status message must begin w/3-digit code" assert status[3]==" ", "Status message must have a space after code" - str_headers = [] - for name,val in headers: - name = self._convert_string_type(name, "Header name") - val = self._convert_string_type(val, "Header value") - str_headers.append((name, val)) - assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed" + if __debug__: + for name, val in headers: + name = self._convert_string_type(name, "Header name") + val = self._convert_string_type(val, "Header value") + assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed" - self.status = status - self.headers = self.headers_class(str_headers) return self.write def _convert_string_type(self, value, title): """Convert/check value type.""" - if isinstance(value, str): + if type(value) is str: return value - assert isinstance(value, bytes), \ - "{0} must be a string or bytes object (not {1})".format(title, value) - return str(value, "iso-8859-1") + raise AssertionError( + "{0} must be of type str (got {1})".format(title, repr(value)) + ) def send_preamble(self): """Transmit version/status/date/server, via self._write()""" if self.origin_server: if self.client_is_modern(): - self._write('HTTP/%s %s\r\n' % (self.http_version,self.status)) + self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1')) if 'Date' not in self.headers: self._write( - 'Date: %s\r\n' % format_date_time(time.time()) + ('Date: %s\r\n' % format_date_time(time.time())).encode('iso-8859-1') ) if self.server_software and 'Server' not in self.headers: - self._write('Server: %s\r\n' % self.server_software) + self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1')) else: - self._write('Status: %s\r\n' % self.status) + self._write(('Status: %s\r\n' % self.status).encode('iso-8859-1')) def write(self, data): - """'write()' callable as specified by PEP 333""" + """'write()' callable as specified by PEP 3333""" - assert isinstance(data, (str, bytes)), \ - "write() argument must be a string or bytes" + assert type(data) is bytes, \ + "write() argument must be a bytes instance" if not self.status: raise AssertionError("write() before start_response()") @@ -256,7 +255,7 @@ class BaseHandler: self.headers_sent = True if not self.origin_server or self.client_is_modern(): self.send_preamble() - self._write(str(self.headers)) + self._write(bytes(self.headers)) def result_is_file(self): @@ -376,12 +375,6 @@ class SimpleHandler(BaseHandler): self.environ.update(self.base_env) def _write(self,data): - if isinstance(data, str): - try: - data = data.encode("iso-8859-1") - except UnicodeEncodeError: - raise ValueError("Unicode data must contain only code points" - " representable in ISO-8859-1 encoding") self.stdout.write(data) def _flush(self): diff --git a/Lib/wsgiref/headers.py b/Lib/wsgiref/headers.py index cc01f5f3293..d93962831ae 100644 --- a/Lib/wsgiref/headers.py +++ b/Lib/wsgiref/headers.py @@ -30,21 +30,20 @@ class Headers: """Manage a collection of HTTP response headers""" def __init__(self,headers): - if not isinstance(headers, list): + if type(headers) is not list: raise TypeError("Headers must be a list of name/value tuples") - self._headers = [] - for k, v in headers: - k = self._convert_string_type(k) - v = self._convert_string_type(v) - self._headers.append((k, v)) + self._headers = headers + if __debug__: + for k, v in headers: + self._convert_string_type(k) + self._convert_string_type(v) def _convert_string_type(self, value): """Convert/check value type.""" - if isinstance(value, str): + if type(value) is str: return value - assert isinstance(value, bytes), ("Header names/values must be" - " a string or bytes object (not {0})".format(value)) - return str(value, "iso-8859-1") + raise AssertionError("Header names/values must be" + " of type str (got {0})".format(repr(value))) def __len__(self): """Return the total number of headers, including duplicates.""" @@ -139,6 +138,9 @@ class Headers: suitable for direct HTTP transmission.""" return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['','']) + def __bytes__(self): + return str(self).encode('iso-8859-1') + def setdefault(self,name,value): """Return first matching header value for 'name', or 'value' diff --git a/Lib/wsgiref/validate.py b/Lib/wsgiref/validate.py index 2df3f9f7fe9..05a485db7a7 100644 --- a/Lib/wsgiref/validate.py +++ b/Lib/wsgiref/validate.py @@ -128,11 +128,10 @@ def assert_(cond, *args): raise AssertionError(*args) def check_string_type(value, title): - if isinstance(value, str): + if type (value) is str: return value - assert isinstance(value, bytes), \ - "{0} must be a string or bytes object (not {1})".format(title, value) - return str(value, "iso-8859-1") + raise AssertionError( + "{0} must be of type str (got {1})".format(title, repr(value))) def validator(application): @@ -197,20 +196,21 @@ class InputWrapper: def read(self, *args): assert_(len(args) == 1) v = self.input.read(*args) - assert_(isinstance(v, bytes)) + assert_(type(v) is bytes) return v - def readline(self): - v = self.input.readline() - assert_(isinstance(v, bytes)) + def readline(self, *args): + assert_(len(args) <= 1) + v = self.input.readline(*args) + assert_(type(v) is bytes) return v def readlines(self, *args): assert_(len(args) <= 1) lines = self.input.readlines(*args) - assert_(isinstance(lines, list)) + assert_(type(lines) is list) for line in lines: - assert_(isinstance(line, bytes)) + assert_(type(line) is bytes) return lines def __iter__(self): @@ -229,7 +229,7 @@ class ErrorWrapper: self.errors = wsgi_errors def write(self, s): - assert_(isinstance(s, str)) + assert_(type(s) is str) self.errors.write(s) def flush(self): @@ -248,7 +248,7 @@ class WriteWrapper: self.writer = wsgi_writer def __call__(self, s): - assert_(isinstance(s, (str, bytes))) + assert_(type(s) is bytes) self.writer(s) class PartialIteratorWrapper: @@ -275,6 +275,8 @@ class IteratorWrapper: assert_(not self.closed, "Iterator read after closed") v = next(self.iterator) + if type(v) is not bytes: + assert_(False, "Iterator yielded non-bytestring (%r)" % (v,)) if self.check_start_response is not None: assert_(self.check_start_response, "The application returns and we started iterating over its body, but start_response has not yet been called") @@ -294,7 +296,7 @@ class IteratorWrapper: "Iterator garbage collected without being closed") def check_environ(environ): - assert_(isinstance(environ, dict), + assert_(type(environ) is dict, "Environment is not of the right type: %r (environment: %r)" % (type(environ), environ)) @@ -321,11 +323,11 @@ def check_environ(environ): if '.' in key: # Extension, we don't care about its type continue - assert_(isinstance(environ[key], str), + assert_(type(environ[key]) is str, "Environmental variable %s is not a string: %r (value: %r)" % (key, type(environ[key]), environ[key])) - assert_(isinstance(environ['wsgi.version'], tuple), + assert_(type(environ['wsgi.version']) is tuple, "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],)) assert_(environ['wsgi.url_scheme'] in ('http', 'https'), "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme']) @@ -385,12 +387,12 @@ def check_status(status): % status, WSGIWarning) def check_headers(headers): - assert_(isinstance(headers, list), + assert_(type(headers) is list, "Headers (%r) must be of type list: %r" % (headers, type(headers))) header_names = {} for item in headers: - assert_(isinstance(item, tuple), + assert_(type(item) is tuple, "Individual headers (%r) must be of type tuple: %r" % (item, type(item))) assert_(len(item) == 2) @@ -428,14 +430,14 @@ def check_content_type(status, headers): assert_(0, "No Content-Type header found in headers (%s)" % headers) def check_exc_info(exc_info): - assert_(exc_info is None or isinstance(exc_info, tuple), + assert_(exc_info is None or type(exc_info) is tuple, "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info))) # More exc_info checks? def check_iterator(iterator): - # Technically a string is legal, which is why it's a really bad + # Technically a bytestring is legal, which is why it's a really bad # idea, because it may cause the response to be returned # character-by-character assert_(not isinstance(iterator, (str, bytes)), "You should not return a string as your application iterator, " - "instead return a single-item list containing that string.") + "instead return a single-item list containing a bytestring.")