mirror of https://github.com/python/cpython
125 lines
3.4 KiB
Python
Executable File
125 lines
3.4 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
"""A multi-threaded telnet-like server that gives a Python prompt.
|
|
|
|
This is really a prototype for the same thing in C.
|
|
|
|
Usage: pysvr.py [port]
|
|
|
|
For security reasons, it only accepts requests from the current host.
|
|
This can still be insecure, but restricts violations from people who
|
|
can log in on your machine. Use with caution!
|
|
|
|
"""
|
|
|
|
import sys, os, string, getopt, _thread, socket, traceback
|
|
|
|
PORT = 4000 # Default port
|
|
|
|
def main():
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], "")
|
|
if len(args) > 1:
|
|
raise getopt.error("Too many arguments.")
|
|
except getopt.error as msg:
|
|
usage(msg)
|
|
for o, a in opts:
|
|
pass
|
|
if args:
|
|
try:
|
|
port = string.atoi(args[0])
|
|
except ValueError as msg:
|
|
usage(msg)
|
|
else:
|
|
port = PORT
|
|
main_thread(port)
|
|
|
|
def usage(msg=None):
|
|
sys.stdout = sys.stderr
|
|
if msg:
|
|
print(msg)
|
|
print("\n", __doc__, end=' ')
|
|
sys.exit(2)
|
|
|
|
def main_thread(port):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.bind(("", port))
|
|
sock.listen(5)
|
|
print("Listening on port", port, "...")
|
|
while 1:
|
|
(conn, addr) = sock.accept()
|
|
if addr[0] != conn.getsockname()[0]:
|
|
conn.close()
|
|
print("Refusing connection from non-local host", addr[0], ".")
|
|
continue
|
|
_thread.start_new_thread(service_thread, (conn, addr))
|
|
del conn, addr
|
|
|
|
def service_thread(conn, addr):
|
|
(caddr, cport) = addr
|
|
print("Thread %s has connection from %s.\n" % (str(_thread.get_ident()),
|
|
caddr), end=' ')
|
|
stdin = conn.makefile("r")
|
|
stdout = conn.makefile("w", 0)
|
|
run_interpreter(stdin, stdout)
|
|
print("Thread %s is done.\n" % str(_thread.get_ident()), end=' ')
|
|
|
|
def run_interpreter(stdin, stdout):
|
|
globals = {}
|
|
try:
|
|
str(sys.ps1)
|
|
except:
|
|
sys.ps1 = ">>> "
|
|
source = ""
|
|
while 1:
|
|
stdout.write(sys.ps1)
|
|
line = stdin.readline()
|
|
if line[:2] == '\377\354':
|
|
line = ""
|
|
if not line and not source:
|
|
break
|
|
if line[-2:] == '\r\n':
|
|
line = line[:-2] + '\n'
|
|
source = source + line
|
|
try:
|
|
code = compile_command(source)
|
|
except SyntaxError as err:
|
|
source = ""
|
|
traceback.print_exception(SyntaxError, err, None, file=stdout)
|
|
continue
|
|
if not code:
|
|
continue
|
|
source = ""
|
|
try:
|
|
run_command(code, stdin, stdout, globals)
|
|
except SystemExit as how:
|
|
if how:
|
|
try:
|
|
how = str(how)
|
|
except:
|
|
how = ""
|
|
stdout.write("Exit %s\n" % how)
|
|
break
|
|
stdout.write("\nGoodbye.\n")
|
|
|
|
def run_command(code, stdin, stdout, globals):
|
|
save = sys.stdin, sys.stdout, sys.stderr
|
|
try:
|
|
sys.stdout = sys.stderr = stdout
|
|
sys.stdin = stdin
|
|
try:
|
|
exec(code, globals)
|
|
except SystemExit as how:
|
|
raise SystemExit(how).with_traceback(sys.exc_info()[2])
|
|
except:
|
|
type, value, tb = sys.exc_info()
|
|
if tb: tb = tb.tb_next
|
|
traceback.print_exception(type, value, tb)
|
|
del tb
|
|
finally:
|
|
sys.stdin, sys.stdout, sys.stderr = save
|
|
|
|
from code import compile_command
|
|
|
|
main()
|