378 lines
13 KiB
Python
378 lines
13 KiB
Python
"""CGI-savvy HTTP Server.
|
|
|
|
This module builds on SimpleHTTPServer by implementing GET and POST
|
|
requests to cgi-bin scripts.
|
|
|
|
If the os.fork() function is not present (e.g. on Windows),
|
|
os.popen2() is used as a fallback, with slightly altered semantics; if
|
|
that function is not present either (e.g. on Macintosh), only Python
|
|
scripts are supported, and they are executed by the current process.
|
|
|
|
In all cases, the implementation is intentionally naive -- all
|
|
requests are executed sychronously.
|
|
|
|
SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
|
|
-- it may execute arbitrary Python code or external programs.
|
|
|
|
Note that status code 200 is sent prior to execution of a CGI script, so
|
|
scripts cannot send other status codes such as 302 (redirect).
|
|
"""
|
|
|
|
|
|
__version__ = "0.4"
|
|
|
|
__all__ = ["CGIHTTPRequestHandler"]
|
|
|
|
import os
|
|
import sys
|
|
import urllib
|
|
import BaseHTTPServer
|
|
import SimpleHTTPServer
|
|
import select
|
|
import copy
|
|
|
|
|
|
class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
|
|
|
|
"""Complete HTTP server with GET, HEAD and POST commands.
|
|
|
|
GET and HEAD also support running CGI scripts.
|
|
|
|
The POST command is *only* implemented for CGI scripts.
|
|
|
|
"""
|
|
|
|
# Determine platform specifics
|
|
have_fork = hasattr(os, 'fork')
|
|
have_popen2 = hasattr(os, 'popen2')
|
|
have_popen3 = hasattr(os, 'popen3')
|
|
|
|
# Make rfile unbuffered -- we need to read one line and then pass
|
|
# the rest to a subprocess, so we can't use buffered input.
|
|
rbufsize = 0
|
|
|
|
def do_POST(self):
|
|
"""Serve a POST request.
|
|
|
|
This is only implemented for CGI scripts.
|
|
|
|
"""
|
|
|
|
if self.is_cgi():
|
|
self.run_cgi()
|
|
else:
|
|
self.send_error(501, "Can only POST to CGI scripts")
|
|
|
|
def send_head(self):
|
|
"""Version of send_head that support CGI scripts"""
|
|
if self.is_cgi():
|
|
return self.run_cgi()
|
|
else:
|
|
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
|
|
|
|
def is_cgi(self):
|
|
"""Test whether self.path corresponds to a CGI script.
|
|
|
|
Returns True and updates the cgi_info attribute to the tuple
|
|
(dir, rest) if self.path requires running a CGI script.
|
|
Returns False otherwise.
|
|
|
|
If any exception is raised, the caller should assume that
|
|
self.path was rejected as invalid and act accordingly.
|
|
|
|
The default implementation tests whether the normalized url
|
|
path begins with one of the strings in self.cgi_directories
|
|
(and the next character is a '/' or the end of the string).
|
|
"""
|
|
collapsed_path = _url_collapse_path(self.path)
|
|
dir_sep = collapsed_path.find('/', 1)
|
|
head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep+1:]
|
|
if head in self.cgi_directories:
|
|
self.cgi_info = head, tail
|
|
return True
|
|
return False
|
|
|
|
cgi_directories = ['/cgi-bin', '/htbin']
|
|
|
|
def is_executable(self, path):
|
|
"""Test whether argument path is an executable file."""
|
|
return executable(path)
|
|
|
|
def is_python(self, path):
|
|
"""Test whether argument path is a Python script."""
|
|
head, tail = os.path.splitext(path)
|
|
return tail.lower() in (".py", ".pyw")
|
|
|
|
def run_cgi(self):
|
|
"""Execute a CGI script."""
|
|
dir, rest = self.cgi_info
|
|
|
|
i = rest.find('/')
|
|
while i >= 0:
|
|
nextdir = rest[:i]
|
|
nextrest = rest[i+1:]
|
|
|
|
scriptdir = self.translate_path(nextdir)
|
|
if os.path.isdir(scriptdir):
|
|
dir, rest = nextdir, nextrest
|
|
i = rest.find('/')
|
|
else:
|
|
break
|
|
|
|
# find an explicit query string, if present.
|
|
i = rest.rfind('?')
|
|
if i >= 0:
|
|
rest, query = rest[:i], rest[i+1:]
|
|
else:
|
|
query = ''
|
|
|
|
# dissect the part after the directory name into a script name &
|
|
# a possible additional path, to be stored in PATH_INFO.
|
|
i = rest.find('/')
|
|
if i >= 0:
|
|
script, rest = rest[:i], rest[i:]
|
|
else:
|
|
script, rest = rest, ''
|
|
|
|
scriptname = dir + '/' + script
|
|
scriptfile = self.translate_path(scriptname)
|
|
if not os.path.exists(scriptfile):
|
|
self.send_error(404, "No such CGI script (%r)" % scriptname)
|
|
return
|
|
if not os.path.isfile(scriptfile):
|
|
self.send_error(403, "CGI script is not a plain file (%r)" %
|
|
scriptname)
|
|
return
|
|
ispy = self.is_python(scriptname)
|
|
if not ispy:
|
|
if not (self.have_fork or self.have_popen2 or self.have_popen3):
|
|
self.send_error(403, "CGI script is not a Python script (%r)" %
|
|
scriptname)
|
|
return
|
|
if not self.is_executable(scriptfile):
|
|
self.send_error(403, "CGI script is not executable (%r)" %
|
|
scriptname)
|
|
return
|
|
|
|
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
|
|
# XXX Much of the following could be prepared ahead of time!
|
|
env = copy.deepcopy(os.environ)
|
|
env['SERVER_SOFTWARE'] = self.version_string()
|
|
env['SERVER_NAME'] = self.server.server_name
|
|
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
|
|
env['SERVER_PROTOCOL'] = self.protocol_version
|
|
env['SERVER_PORT'] = str(self.server.server_port)
|
|
env['REQUEST_METHOD'] = self.command
|
|
uqrest = urllib.unquote(rest)
|
|
env['PATH_INFO'] = uqrest
|
|
env['PATH_TRANSLATED'] = self.translate_path(uqrest)
|
|
env['SCRIPT_NAME'] = scriptname
|
|
if query:
|
|
env['QUERY_STRING'] = query
|
|
host = self.address_string()
|
|
if host != self.client_address[0]:
|
|
env['REMOTE_HOST'] = host
|
|
env['REMOTE_ADDR'] = self.client_address[0]
|
|
authorization = self.headers.getheader("authorization")
|
|
if authorization:
|
|
authorization = authorization.split()
|
|
if len(authorization) == 2:
|
|
import base64, binascii
|
|
env['AUTH_TYPE'] = authorization[0]
|
|
if authorization[0].lower() == "basic":
|
|
try:
|
|
authorization = base64.decodestring(authorization[1])
|
|
except binascii.Error:
|
|
pass
|
|
else:
|
|
authorization = authorization.split(':')
|
|
if len(authorization) == 2:
|
|
env['REMOTE_USER'] = authorization[0]
|
|
# XXX REMOTE_IDENT
|
|
if self.headers.typeheader is None:
|
|
env['CONTENT_TYPE'] = self.headers.type
|
|
else:
|
|
env['CONTENT_TYPE'] = self.headers.typeheader
|
|
length = self.headers.getheader('content-length')
|
|
if length:
|
|
env['CONTENT_LENGTH'] = length
|
|
referer = self.headers.getheader('referer')
|
|
if referer:
|
|
env['HTTP_REFERER'] = referer
|
|
accept = []
|
|
for line in self.headers.getallmatchingheaders('accept'):
|
|
if line[:1] in "\t\n\r ":
|
|
accept.append(line.strip())
|
|
else:
|
|
accept = accept + line[7:].split(',')
|
|
env['HTTP_ACCEPT'] = ','.join(accept)
|
|
ua = self.headers.getheader('user-agent')
|
|
if ua:
|
|
env['HTTP_USER_AGENT'] = ua
|
|
co = filter(None, self.headers.getheaders('cookie'))
|
|
if co:
|
|
env['HTTP_COOKIE'] = ', '.join(co)
|
|
# XXX Other HTTP_* headers
|
|
# Since we're setting the env in the parent, provide empty
|
|
# values to override previously set values
|
|
for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
|
|
'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
|
|
env.setdefault(k, "")
|
|
|
|
self.send_response(200, "Script output follows")
|
|
|
|
decoded_query = query.replace('+', ' ')
|
|
|
|
if self.have_fork:
|
|
# Unix -- fork as we should
|
|
args = [script]
|
|
if '=' not in decoded_query:
|
|
args.append(decoded_query)
|
|
nobody = nobody_uid()
|
|
self.wfile.flush() # Always flush before forking
|
|
pid = os.fork()
|
|
if pid != 0:
|
|
# Parent
|
|
pid, sts = os.waitpid(pid, 0)
|
|
# throw away additional data [see bug #427345]
|
|
while select.select([self.rfile], [], [], 0)[0]:
|
|
if not self.rfile.read(1):
|
|
break
|
|
if sts:
|
|
self.log_error("CGI script exit status %#x", sts)
|
|
return
|
|
# Child
|
|
try:
|
|
try:
|
|
os.setuid(nobody)
|
|
except os.error:
|
|
pass
|
|
os.dup2(self.rfile.fileno(), 0)
|
|
os.dup2(self.wfile.fileno(), 1)
|
|
os.execve(scriptfile, args, env)
|
|
except:
|
|
self.server.handle_error(self.request, self.client_address)
|
|
os._exit(127)
|
|
|
|
else:
|
|
# Non Unix - use subprocess
|
|
import subprocess
|
|
cmdline = [scriptfile]
|
|
if self.is_python(scriptfile):
|
|
interp = sys.executable
|
|
if interp.lower().endswith("w.exe"):
|
|
# On Windows, use python.exe, not pythonw.exe
|
|
interp = interp[:-5] + interp[-4:]
|
|
cmdline = [interp, '-u'] + cmdline
|
|
if '=' not in query:
|
|
cmdline.append(query)
|
|
|
|
self.log_message("command: %s", subprocess.list2cmdline(cmdline))
|
|
try:
|
|
nbytes = int(length)
|
|
except (TypeError, ValueError):
|
|
nbytes = 0
|
|
p = subprocess.Popen(cmdline,
|
|
stdin = subprocess.PIPE,
|
|
stdout = subprocess.PIPE,
|
|
stderr = subprocess.PIPE,
|
|
env = env
|
|
)
|
|
if self.command.lower() == "post" and nbytes > 0:
|
|
data = self.rfile.read(nbytes)
|
|
else:
|
|
data = None
|
|
# throw away additional data [see bug #427345]
|
|
while select.select([self.rfile._sock], [], [], 0)[0]:
|
|
if not self.rfile._sock.recv(1):
|
|
break
|
|
stdout, stderr = p.communicate(data)
|
|
self.wfile.write(stdout)
|
|
if stderr:
|
|
self.log_error('%s', stderr)
|
|
p.stderr.close()
|
|
p.stdout.close()
|
|
status = p.returncode
|
|
if status:
|
|
self.log_error("CGI script exit status %#x", status)
|
|
else:
|
|
self.log_message("CGI script exited OK")
|
|
|
|
|
|
def _url_collapse_path(path):
|
|
"""
|
|
Given a URL path, remove extra '/'s and '.' path elements and collapse
|
|
any '..' references and returns a colllapsed path.
|
|
|
|
Implements something akin to RFC-2396 5.2 step 6 to parse relative paths.
|
|
The utility of this function is limited to is_cgi method and helps
|
|
preventing some security attacks.
|
|
|
|
Returns: A tuple of (head, tail) where tail is everything after the final /
|
|
and head is everything before it. Head will always start with a '/' and,
|
|
if it contains anything else, never have a trailing '/'.
|
|
|
|
Raises: IndexError if too many '..' occur within the path.
|
|
|
|
"""
|
|
# Similar to os.path.split(os.path.normpath(path)) but specific to URL
|
|
# path semantics rather than local operating system semantics.
|
|
path_parts = path.split('/')
|
|
head_parts = []
|
|
for part in path_parts[:-1]:
|
|
if part == '..':
|
|
head_parts.pop() # IndexError if more '..' than prior parts
|
|
elif part and part != '.':
|
|
head_parts.append( part )
|
|
if path_parts:
|
|
tail_part = path_parts.pop()
|
|
if tail_part:
|
|
if tail_part == '..':
|
|
head_parts.pop()
|
|
tail_part = ''
|
|
elif tail_part == '.':
|
|
tail_part = ''
|
|
else:
|
|
tail_part = ''
|
|
|
|
splitpath = ('/' + '/'.join(head_parts), tail_part)
|
|
collapsed_path = "/".join(splitpath)
|
|
|
|
return collapsed_path
|
|
|
|
|
|
nobody = None
|
|
|
|
def nobody_uid():
|
|
"""Internal routine to get nobody's uid"""
|
|
global nobody
|
|
if nobody:
|
|
return nobody
|
|
try:
|
|
import pwd
|
|
except ImportError:
|
|
return -1
|
|
try:
|
|
nobody = pwd.getpwnam('nobody')[2]
|
|
except KeyError:
|
|
nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
|
|
return nobody
|
|
|
|
|
|
def executable(path):
|
|
"""Test for executable file."""
|
|
try:
|
|
st = os.stat(path)
|
|
except os.error:
|
|
return False
|
|
return st.st_mode & 0111 != 0
|
|
|
|
|
|
def test(HandlerClass = CGIHTTPRequestHandler,
|
|
ServerClass = BaseHTTPServer.HTTPServer):
|
|
SimpleHTTPServer.test(HandlerClass, ServerClass)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test()
|