Issue #4718: Adapt the wsgiref package so that it actually works with Python 3.x,

in accordance with http://www.wsgi.org/wsgi/Amendments_1.0
This commit is contained in:
Antoine Pitrou 2009-01-03 18:41:49 +00:00
parent ffe431d8bd
commit 38a66adccb
8 changed files with 184 additions and 60 deletions

View File

@ -122,13 +122,13 @@ parameter expect a WSGI-compliant dictionary to be supplied; please see
def simple_app(environ, start_response):
setup_testing_defaults(environ)
status = '200 OK'
headers = [('Content-type', 'text/plain')]
status = b'200 OK'
headers = [(b'Content-type', b'text/plain; charset=utf-8')]
start_response(status, headers)
ret = ["%s: %s\n" % (key, value)
for key, value in environ.iteritems()]
ret = [("%s: %s\n" % (key, value)).encode("utf-8")
for key, value in environ.items()]
return ret
httpd = make_server('', 8000, simple_app)
@ -161,7 +161,7 @@ also provides these miscellaneous utilities:
Example usage::
from StringIO import StringIO
from io import StringIO
from wsgiref.util import FileWrapper
# We're using a StringIO-buffer for as the file-like object
@ -416,13 +416,13 @@ Paste" library.
# Our callable object which is intentionally not compliant to the
# standard, so the validator is going to break
def simple_app(environ, start_response):
status = '200 OK' # HTTP Status
headers = [('Content-type', 'text/plain')] # HTTP Headers
status = b'200 OK' # HTTP Status
headers = [(b'Content-type', b'text/plain')] # HTTP Headers
start_response(status, headers)
# This is going to break because we need to return a list, and
# the validator is going to inform us
return "Hello World"
return b"Hello World"
# This is the application wrapped in a validator
validator_app = validator(simple_app)
@ -509,7 +509,7 @@ input, output, and error streams.
.. method:: BaseHandler._write(data)
Buffer the string *data* for transmission to the client. It's okay if this
Buffer the bytes *data* for transmission to the client. It's okay if this
method actually transmits the data; :class:`BaseHandler` just separates write
and flush operations for greater efficiency when the underlying system actually
has such a distinction.
@ -712,12 +712,12 @@ This is a working "Hello World" WSGI application::
# is a dictionary containing CGI-style envrironment variables and the
# second variable is the callable object (see PEP333)
def hello_world_app(environ, start_response):
status = '200 OK' # HTTP Status
headers = [('Content-type', 'text/plain')] # HTTP Headers
status = b'200 OK' # HTTP Status
headers = [(b'Content-type', b'text/plain; charset=utf-8')] # HTTP Headers
start_response(status, headers)
# The returned object is going to be printed
return ["Hello World"]
return [b"Hello World"]
httpd = make_server('', 8000, hello_world_app)
print("Serving on port 8000...")

View File

@ -50,7 +50,7 @@ def hello_app(environ,start_response):
def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
server = make_server("", 80, app, MockServer, MockHandler)
inp = BufferedReader(BytesIO(data))
out = StringIO()
out = BytesIO()
olderr = sys.stderr
err = sys.stderr = StringIO()
@ -128,13 +128,13 @@ class IntegrationTests(TestCase):
def check_hello(self, out, has_length=True):
self.assertEqual(out,
"HTTP/1.0 200 OK\r\n"
("HTTP/1.0 200 OK\r\n"
"Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
"Content-Type: text/plain\r\n"
"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
(has_length and "Content-Length: 13\r\n" or "") +
"\r\n"
"Hello, world!"
"Hello, world!").encode("iso-8859-1")
)
def test_plain_hello(self):
@ -152,7 +152,7 @@ class IntegrationTests(TestCase):
return ["Hello, world!"]
out, err = run_amock(validator(bad_app))
self.failUnless(out.endswith(
"A server error occurred. Please contact the administrator."
b"A server error occurred. Please contact the administrator."
))
self.assertEqual(
err.splitlines()[-2],
@ -160,7 +160,36 @@ class IntegrationTests(TestCase):
" be of type list: <class 'tuple'>"
)
def test_wsgi_input(self):
def bad_app(e,s):
e["wsgi.input"].read()
s(b"200 OK", [(b"Content-Type", b"text/plain; charset=utf-8")])
return [b"data"]
out, err = run_amock(validator(bad_app))
self.failUnless(out.endswith(
b"A server error occurred. Please contact the administrator."
))
self.assertEqual(
err.splitlines()[-2], "AssertionError"
)
def test_bytes_validation(self):
def app(e, s):
s(b"200 OK", [
(b"Content-Type", b"text/plain; charset=utf-8"),
("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
])
return [b"data"]
out, err = run_amock(validator(app))
self.failUnless(err.endswith('"GET / HTTP/1.0" 200 4\n'))
self.assertEqual(
b"HTTP/1.0 200 OK\r\n"
b"Server: WSGIServer/0.1 Python/3.1a0\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
b"\r\n"
b"data",
out)
@ -181,6 +210,8 @@ class UtilityTests(TestCase):
util.setup_testing_defaults(env)
if isinstance(value,StringIO):
self.failUnless(isinstance(env[key],StringIO))
elif isinstance(value,BytesIO):
self.failUnless(isinstance(env[key],BytesIO))
else:
self.assertEqual(env[key],value)
@ -260,7 +291,7 @@ class UtilityTests(TestCase):
('wsgi.run_once', 0),
('wsgi.multithread', 0),
('wsgi.multiprocess', 0),
('wsgi.input', StringIO("")),
('wsgi.input', BytesIO()),
('wsgi.errors', StringIO()),
('wsgi.url_scheme','http'),
]:
@ -386,6 +417,23 @@ class HeaderTests(TestCase):
'\r\n'
)
def testBytes(self):
h = Headers([
(b"Content-Type", b"text/plain; charset=utf-8"),
])
self.assertEqual("text/plain; charset=utf-8", h.get("Content-Type"))
h[b"Foo"] = bytes(b"bar")
self.assertEqual("bar", h.get("Foo"))
h.setdefault(b"Bar", b"foo")
self.assertEqual("foo", h.get("Bar"))
h.add_header(b'content-disposition', b'attachment',
filename=b'bud.gif')
self.assertEqual('attachment; filename="bud.gif"',
h.get("content-disposition"))
class ErrorHandler(BaseCGIHandler):
"""Simple handler subclass for testing BaseHandler"""
@ -393,7 +441,7 @@ class ErrorHandler(BaseCGIHandler):
def __init__(self,**kw):
setup_testing_defaults(kw)
BaseCGIHandler.__init__(
self, StringIO(''), StringIO(), StringIO(), kw,
self, BytesIO(), BytesIO(), StringIO(), kw,
multithread=True, multiprocess=True
)
@ -474,21 +522,32 @@ class HandlerTests(TestCase):
s('200 OK',[])(e['wsgi.url_scheme'])
return []
def trivial_app3(e,s):
s('200 OK',[])
return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
h = TestHandler()
h.run(trivial_app1)
self.assertEqual(h.stdout.getvalue(),
"Status: 200 OK\r\n"
("Status: 200 OK\r\n"
"Content-Length: 4\r\n"
"\r\n"
"http")
"http").encode("iso-8859-1"))
h = TestHandler()
h.run(trivial_app2)
self.assertEqual(h.stdout.getvalue(),
"Status: 200 OK\r\n"
("Status: 200 OK\r\n"
"\r\n"
"http")
"http").encode("iso-8859-1"))
h = TestHandler()
h.run(trivial_app3)
self.assertEqual(h.stdout.getvalue(),
b'Status: 200 OK\r\n'
b'Content-Length: 8\r\n'
b'\r\n'
b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
@ -507,18 +566,19 @@ class HandlerTests(TestCase):
h = ErrorHandler()
h.run(non_error_app)
self.assertEqual(h.stdout.getvalue(),
"Status: 200 OK\r\n"
("Status: 200 OK\r\n"
"Content-Length: 0\r\n"
"\r\n")
"\r\n").encode("iso-8859-1"))
self.assertEqual(h.stderr.getvalue(),"")
h = ErrorHandler()
h.run(error_app)
self.assertEqual(h.stdout.getvalue(),
"Status: %s\r\n"
("Status: %s\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n"
"\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
"\r\n%s" % (h.error_status,len(h.error_body),h.error_body)
).encode("iso-8859-1"))
self.failUnless("AssertionError" in h.stderr.getvalue())
@ -531,8 +591,8 @@ class HandlerTests(TestCase):
h = ErrorHandler()
h.run(error_app)
self.assertEqual(h.stdout.getvalue(),
"Status: 200 OK\r\n"
"\r\n"+MSG)
("Status: 200 OK\r\n"
"\r\n"+MSG).encode("iso-8859-1"))
self.failUnless("AssertionError" in h.stderr.getvalue())
@ -549,7 +609,7 @@ class HandlerTests(TestCase):
)
shortpat = (
"Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
)
).encode("iso-8859-1")
for ssw in "FooBar/1.0", None:
sw = ssw and "Server: %s\r\n" % ssw or ""
@ -570,13 +630,31 @@ class HandlerTests(TestCase):
h.server_software = ssw
h.run(non_error_app)
if proto=="HTTP/0.9":
self.assertEqual(h.stdout.getvalue(),"")
self.assertEqual(h.stdout.getvalue(),b"")
else:
self.failUnless(
re.match(stdpat%(version,sw), h.stdout.getvalue()),
(stdpat%(version,sw), h.stdout.getvalue())
re.match((stdpat%(version,sw)).encode("iso-8859-1"),
h.stdout.getvalue()),
((stdpat%(version,sw)).encode("iso-8859-1"),
h.stdout.getvalue())
)
def testBytesData(self):
def app(e, s):
s(b"200 OK", [
(b"Content-Type", b"text/plain; charset=utf-8"),
])
return [b"data"]
h = TestHandler()
h.run(app)
self.assertEqual(b"Status: 200 OK\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"Content-Length: 4\r\n"
b"\r\n"
b"data",
h.stdout.getvalue())
# This epilogue is needed for compatibility with the Python 2.5 regrtest module
def test_main():

View File

@ -157,19 +157,29 @@ class BaseHandler:
elif self.headers is not None:
raise AssertionError("Headers already set!")
assert type(status) is str,"Status must be a string"
status = self._convert_string_type(status, "Status")
assert len(status)>=4,"Status must be at least 4 characters"
assert int(status[:3]),"Status message must begin w/3-digit code"
assert status[3]==" ", "Status message must have a space after code"
if __debug__:
str_headers = []
for name,val in headers:
assert type(name) is str,"Header names must be strings"
assert type(val) is str,"Header values must be strings"
name = self._convert_string_type(name, "Header name")
val = self._convert_string_type(val, "Header value")
str_headers.append((name, val))
assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
self.status = status
self.headers = self.headers_class(headers)
self.headers = self.headers_class(str_headers)
return self.write
def _convert_string_type(self, value, title):
"""Convert/check value type."""
if isinstance(value, str):
return value
assert isinstance(value, bytes), \
"{0} must be a string or bytes object (not {1})".format(title, value)
return str(value, "iso-8859-1")
def send_preamble(self):
"""Transmit version/status/date/server, via self._write()"""
@ -188,7 +198,8 @@ class BaseHandler:
def write(self, data):
"""'write()' callable as specified by PEP 333"""
assert type(data) is str,"write() argument must be string"
assert isinstance(data, (str, bytes)), \
"write() argument must be a string or bytes"
if not self.status:
raise AssertionError("write() before start_response()")
@ -382,8 +393,13 @@ class SimpleHandler(BaseHandler):
self.environ.update(self.base_env)
def _write(self,data):
if isinstance(data, str):
try:
data = data.encode("iso-8859-1")
except UnicodeEncodeError:
raise ValueError("Unicode data must contain only code points"
" representable in ISO-8859-1 encoding")
self.stdout.write(data)
self._write = self.stdout.write
def _flush(self):
self.stdout.flush()

View File

@ -44,7 +44,19 @@ class Headers:
def __init__(self,headers):
if not isinstance(headers, list):
raise TypeError("Headers must be a list of name/value tuples")
self._headers = headers
self._headers = []
for k, v in headers:
k = self._convert_string_type(k)
v = self._convert_string_type(v)
self._headers.append((k, v))
def _convert_string_type(self, value):
"""Convert/check value type."""
if isinstance(value, str):
return value
assert isinstance(value, bytes), ("Header names/values must be"
" a string or bytes object (not {0})".format(value))
return str(value, "iso-8859-1")
def __len__(self):
"""Return the total number of headers, including duplicates."""
@ -53,7 +65,8 @@ class Headers:
def __setitem__(self, name, val):
"""Set the value of a header."""
del self[name]
self._headers.append((name, val))
self._headers.append(
(self._convert_string_type(name), self._convert_string_type(val)))
def __delitem__(self,name):
"""Delete all occurrences of a header, if present.
@ -152,7 +165,8 @@ class Headers:
and value 'value'."""
result = self.get(name)
if result is None:
self._headers.append((name,value))
self._headers.append((self._convert_string_type(name),
self._convert_string_type(value)))
return value
else:
return result
@ -176,13 +190,16 @@ class Headers:
"""
parts = []
if _value is not None:
_value = self._convert_string_type(_value)
parts.append(_value)
for k, v in _params.items():
k = self._convert_string_type(k)
if v is None:
parts.append(k.replace('_', '-'))
else:
v = self._convert_string_type(v)
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((_name, "; ".join(parts)))
self._headers.append((self._convert_string_type(_name), "; ".join(parts)))

View File

@ -111,8 +111,7 @@ class WSGIRequestHandler(BaseHTTPRequestHandler):
if length:
env['CONTENT_LENGTH'] = length
for h in self.headers:
k,v = h.split(':',1)
for k, v in self.headers.items():
k=k.replace('-','_').upper(); v=v.strip()
if k in env:
continue # skip content length, type,etc.
@ -168,11 +167,11 @@ def demo_app(environ,start_response):
stdout = StringIO()
print("Hello world!", file=stdout)
print(file=stdout)
h = environ.items(); h.sort()
h = sorted(environ.items())
for k,v in h:
print(k,'=',repr(v), file=stdout)
start_response("200 OK", [('Content-Type','text/plain')])
return [stdout.getvalue()]
start_response(b"200 OK", [(b'Content-Type',b'text/plain; charset=utf-8')])
return [stdout.getvalue().encode("utf-8")]
def make_server(

View File

@ -149,8 +149,8 @@ def setup_testing_defaults(environ):
environ.setdefault('wsgi.multithread', 0)
environ.setdefault('wsgi.multiprocess', 0)
from io import StringIO
environ.setdefault('wsgi.input', StringIO(""))
from io import StringIO, BytesIO
environ.setdefault('wsgi.input', BytesIO())
environ.setdefault('wsgi.errors', StringIO())
environ.setdefault('wsgi.url_scheme',guess_scheme(environ))

View File

@ -127,6 +127,13 @@ def assert_(cond, *args):
if not cond:
raise AssertionError(*args)
def check_string_type(value, title):
if isinstance(value, str):
return value
assert isinstance(value, bytes), \
"{0} must be a string or bytes object (not {1})".format(title, value)
return str(value, "iso-8859-1")
def validator(application):
"""
@ -188,14 +195,14 @@ class InputWrapper:
self.input = wsgi_input
def read(self, *args):
assert_(len(args) <= 1)
assert_(len(args) == 1)
v = self.input.read(*args)
assert_(isinstance(v, str))
assert_(isinstance(v, bytes))
return v
def readline(self):
v = self.input.readline()
assert_(isinstance(v, str))
assert_(isinstance(v, bytes))
return v
def readlines(self, *args):
@ -203,7 +210,7 @@ class InputWrapper:
lines = self.input.readlines(*args)
assert_(isinstance(lines, list))
for line in lines:
assert_(isinstance(line, str))
assert_(isinstance(line, bytes))
return lines
def __iter__(self):
@ -241,7 +248,7 @@ class WriteWrapper:
self.writer = wsgi_writer
def __call__(self, s):
assert_(isinstance(s, str))
assert_(isinstance(s, (str, bytes)))
self.writer(s)
class PartialIteratorWrapper:
@ -364,8 +371,7 @@ def check_errors(wsgi_errors):
% (wsgi_errors, attr))
def check_status(status):
assert_(isinstance(status, str),
"Status must be a string (not %r)" % status)
status = check_string_type(status, "Status")
# Implicitly check that we can turn it into an integer:
status_code = status.split(None, 1)[0]
assert_(len(status_code) == 3,
@ -389,6 +395,8 @@ def check_headers(headers):
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
name = check_string_type(name, "Header name")
value = check_string_type(value, "Header value")
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
@ -404,11 +412,13 @@ def check_headers(headers):
% (value, bad_header_value_re.search(value).group(0)))
def check_content_type(status, headers):
status = check_string_type(status, "Status")
code = int(status.split(None, 1)[0])
# @@: need one more person to verify this interpretation of RFC 2616
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
NO_MESSAGE_BODY = (204, 304)
for name, value in headers:
name = check_string_type(name, "Header name")
if name.lower() == 'content-type':
if code not in NO_MESSAGE_BODY:
return
@ -426,6 +436,6 @@ def check_iterator(iterator):
# Technically a string is legal, which is why it's a really bad
# idea, because it may cause the response to be returned
# character-by-character
assert_(not isinstance(iterator, str),
assert_(not isinstance(iterator, (str, bytes)),
"You should not return a string as your application iterator, "
"instead return a single-item list containing that string.")

View File

@ -84,6 +84,10 @@ Core and Builtins
Library
-------
- Issue #4718: Adapt the wsgiref package so that it actually works with
Python 3.x, in accordance with the `official amendments of the spec
<http://www.wsgi.org/wsgi/Amendments_1.0>`_.
- Issue #4812: add missing underscore prefix to some internal-use-only
constants in the decimal module. (Dec_0 becomes _Dec_0, etc.)