refactored a part of the directory body so any customizing caller can use that part to get a link for their implementation

Removed encoding from the scope of the directory_body method

Ran the IDEs auto indent/ auto format
This commit is contained in:
Mikkel Juul 2020-11-20 09:31:32 +01:00
parent 3bea9f9b03
commit ebd6de1f31
1 changed files with 65 additions and 59 deletions

View File

@ -31,7 +31,6 @@ XXX To do:
- send error log to separate file
"""
# See also:
#
# HTTP Working Group T. Berners-Lee
@ -108,7 +107,6 @@ from functools import partial
from http import HTTPStatus
# Default error message template
DEFAULT_ERROR_MESSAGE = """\
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
@ -129,8 +127,8 @@ DEFAULT_ERROR_MESSAGE = """\
DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
class HTTPServer(socketserver.TCPServer):
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = 1 # Seems to make sense in testing environment
def server_bind(self):
@ -146,7 +144,6 @@ class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
"""HTTP request handler base class.
The following explanation of HTTP serves to guide you through the
@ -413,9 +410,9 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
return
method = getattr(self, mname)
method()
self.wfile.flush() #actually send the response if not already done.
self.wfile.flush() # actually send the response if not already done.
except socket.timeout as e:
#a read or a write timed out. Discard this connection
# a read or a write timed out. Discard this connection
self.log_error("Request timed out: %r", e)
self.close_connection = True
return
@ -578,7 +575,7 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format%args))
format % args))
def version_string(self):
"""Return the server software version string."""
@ -626,7 +623,6 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""Simple HTTP request handler with GET and HEAD commands.
This serves files from the current directory and any of its
@ -763,21 +759,21 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""
try:
list = os.listdir(path)
list_of_files = os.listdir(path)
except OSError:
self.send_error(
HTTPStatus.NOT_FOUND,
"No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
list_of_files.sort(key=lambda a: a.lower())
try:
displaypath = urllib.parse.unquote(self.path,
display_path = urllib.parse.unquote(self.path,
errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
display_path = urllib.parse.unquote(path)
display_path = html.escape(display_path, quote=False)
enc = sys.getfilesystemencoding()
encoded = self.encoded_directory_body(list, displaypath, path, enc)
encoded = self.directory_body(list_of_files, display_path, path, enc).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
@ -795,8 +791,8 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""
# abandon query parameters
path = path.split('?',1)[0]
path = path.split('#',1)[0]
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
trailing_slash = path.rstrip().endswith('/')
try:
@ -857,14 +853,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
return guess
return 'application/octet-stream'
def encoded_directory_body(self, list_of_files, displaypath, actual_path, enc) -> bytes:
def directory_body(self, list_of_files, displaypath, actual_path, enc) -> str:
"""
Compose and encode the list_of_files into HTML
displaypath is a relative path str
actual_path is the full actual filesystem path
enc is the encoding of the system `sys.getfilesystemencoding()`
return the encoded body text
return the body text
Override this method to change how the server displays the files
Example:
@ -876,7 +872,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
... whatever you want
Handler = SimpleHTTPRequestHandler
Handler.encoded_directory_body = my_beautiful_body
Handler.directory_body = my_beautiful_body
httpd = TCPServer(("", PORT), Handler)
@ -894,21 +890,10 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
r.append('<body>\n<h1>%s</h1>' % title)
r.append('<hr>\n<ul>')
for name in list_of_files:
fullname = os.path.join(actual_path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
r.append('<li><a href="%s">%s</a></li>'
% (urllib.parse.quote(linkname,
errors='surrogatepass'),
html.escape(displayname, quote=False)))
% self.link_and_display_name(actual_path, name))
r.append('</ul>\n<hr>\n</body>\n</html>\n')
return '\n'.join(r).encode(enc, 'surrogateescape')
return '\n'.join(r)
def send_directory_headers(self, enc, content_length):
"""
@ -921,14 +906,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
from json import dumps
def my_json_body(self, list_of_files, displaypath, actual_path, enc) -> bytes:
return dumps({'files: list_of_files}).encode(enc, 'surrogateescape')
return dumps({'files: list_of_files})
def send_json_headers(self, enc, len):
self.send_header('Content-type', 'application/json; charset=%s' % enc)
self.send_header('Content-Length', len)
Handler = SimpleHTTPRequestHandler
Handler.encoded_directory_body = my_json_body
Handler.directory_body = my_json_body
Handler.send_directory_headers = send_json_headers
httpd = TCPServer(("", PORT), Handler)
@ -939,6 +924,26 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
self.send_header("Content-type", "text/html; charset=%s" % enc)
self.send_header("Content-Length", content_length)
@staticmethod
def link_and_display_name(actual_path, name) -> (str, str):
"""
Returns the url link and a displayname of a file given a path
other users may want to use this method when customizing the html response of the handler
"""
fullname = os.path.join(actual_path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
return (urllib.parse.quote(linkname,
errors='surrogatepass'),
html.escape(displayname, quote=False))
# Utilities for CGIHTTPRequestHandler
@ -968,7 +973,7 @@ def _url_collapse_path(path):
if part == '..':
head_parts.pop() # IndexError if more '..' than prior parts
elif part and part != '.':
head_parts.append( part )
head_parts.append(part)
if path_parts:
tail_part = path_parts.pop()
if tail_part:
@ -989,9 +994,9 @@ def _url_collapse_path(path):
return collapsed_path
nobody = None
def nobody_uid():
"""Internal routine to get nobody's uid"""
global nobody
@ -1014,7 +1019,6 @@ def executable(path):
class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
"""Complete HTTP server with GET, HEAD and POST commands.
GET and HEAD also support running CGI scripts.
@ -1069,14 +1073,13 @@ class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
collapsed_path = _url_collapse_path(self.path)
dir_sep = collapsed_path.find('/', 1)
while dir_sep > 0 and not collapsed_path[:dir_sep] in self.cgi_directories:
dir_sep = collapsed_path.find('/', dir_sep+1)
dir_sep = collapsed_path.find('/', dir_sep + 1)
if dir_sep > 0:
head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep+1:]
head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep + 1:]
self.cgi_info = head, tail
return True
return False
cgi_directories = ['/cgi-bin', '/htbin']
def is_executable(self, path):
@ -1092,15 +1095,15 @@ class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
"""Execute a CGI script."""
dir, rest = self.cgi_info
path = dir + '/' + rest
i = path.find('/', len(dir)+1)
i = path.find('/', len(dir) + 1)
while i >= 0:
nextdir = path[:i]
nextrest = path[i+1:]
nextrest = path[i + 1:]
scriptdir = self.translate_path(nextdir)
if os.path.isdir(scriptdir):
dir, rest = nextdir, nextrest
i = path.find('/', len(dir)+1)
i = path.find('/', len(dir) + 1)
else:
break
@ -1160,7 +1163,7 @@ class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
if authorization[0].lower() == "basic":
try:
authorization = authorization[1].encode('ascii')
authorization = base64.decodebytes(authorization).\
authorization = base64.decodebytes(authorization). \
decode('ascii')
except (binascii.Error, UnicodeError):
pass
@ -1258,7 +1261,7 @@ class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env = env
env=env
)
if self.command.lower() == "post" and nbytes > 0:
data = self.rfile.read(nbytes)
@ -1315,6 +1318,7 @@ def test(HandlerClass=BaseHTTPRequestHandler,
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
if __name__ == '__main__':
import argparse
@ -1338,6 +1342,7 @@ if __name__ == '__main__':
handler_class = partial(SimpleHTTPRequestHandler,
directory=args.directory)
# ensure dual-stack is not disabled; ref #38907
class DualStackServer(ThreadingHTTPServer):
def server_bind(self):
@ -1347,6 +1352,7 @@ if __name__ == '__main__':
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
test(
HandlerClass=handler_class,
ServerClass=DualStackServer,