Added tests for simple function calls and introspection that are run
against a SimpleXMLRPCServer in a separate thread. Because of problems with WSAEWOULDBLOCK (error 10035) being raised by the server on Windows, these new tests are skipped on win32. [GSoC - Alan McIntyre]
This commit is contained in:
parent
2ee4128e9b
commit
a53872b09a
|
@ -4,6 +4,8 @@ import sys
|
|||
import time
|
||||
import unittest
|
||||
import xmlrpclib
|
||||
import SimpleXMLRPCServer
|
||||
import threading
|
||||
from test import test_support
|
||||
|
||||
try:
|
||||
|
@ -288,10 +290,108 @@ class BinaryTestCase(unittest.TestCase):
|
|||
self.assertEqual(str(t2), d)
|
||||
|
||||
|
||||
def test_main():
|
||||
test_support.run_unittest(XMLRPCTestCase, HelperTestCase,
|
||||
DateTimeTestCase, BinaryTestCase, FaultTestCase)
|
||||
PORT = None
|
||||
|
||||
def http_server(evt, numrequests):
|
||||
class TestInstanceClass:
|
||||
def div(self, x, y):
|
||||
'''This is the div function'''
|
||||
return x // y
|
||||
|
||||
|
||||
serv = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", 0),
|
||||
logRequests=False, bind_and_activate=False)
|
||||
|
||||
try:
|
||||
serv.socket.settimeout(3)
|
||||
serv.server_bind()
|
||||
global PORT
|
||||
PORT = serv.socket.getsockname()[1]
|
||||
serv.server_activate()
|
||||
serv.register_introspection_functions()
|
||||
serv.register_multicall_functions()
|
||||
serv.register_function(pow)
|
||||
serv.register_function(lambda x,y: x+y, 'add')
|
||||
serv.register_instance(TestInstanceClass())
|
||||
|
||||
# handle up to 'numrequests' requests
|
||||
while numrequests > 0:
|
||||
serv.handle_request()
|
||||
numrequests -= 1
|
||||
|
||||
except socket.timeout:
|
||||
pass
|
||||
finally:
|
||||
serv.socket.close()
|
||||
PORT = None
|
||||
evt.set()
|
||||
|
||||
|
||||
class HTTPTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.evt = threading.Event()
|
||||
# start server thread to handle just one request
|
||||
threading.Thread(target=http_server, args=(self.evt,2)).start()
|
||||
|
||||
# wait for port to be assigned to server
|
||||
n = 1000
|
||||
while n > 0 and PORT is None:
|
||||
time.sleep(0.001)
|
||||
n -= 1
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
def tearDown(self):
|
||||
# wait on the server thread to terminate
|
||||
self.evt.wait()
|
||||
|
||||
def test_simple1(self):
|
||||
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
|
||||
def test_introspection1(self):
|
||||
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
|
||||
meth = p.system.listMethods()
|
||||
expected_methods = set(['pow', 'div', 'add', 'system.listMethods',
|
||||
'system.methodHelp', 'system.methodSignature', 'system.multicall'])
|
||||
self.assertEqual(set(meth), expected_methods)
|
||||
|
||||
def test_introspection2(self):
|
||||
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
|
||||
divhelp = p.system.methodHelp('div')
|
||||
self.assertEqual(divhelp, 'This is the div function')
|
||||
|
||||
def test_introspection3(self):
|
||||
# the SimpleXMLRPCServer doesn't support signatures, but
|
||||
# at least check that we can try
|
||||
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
|
||||
divsig = p.system.methodSignature('div')
|
||||
self.assertEqual(divsig, 'signatures not supported')
|
||||
|
||||
def test_multicall(self):
|
||||
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
|
||||
multicall = xmlrpclib.MultiCall(p)
|
||||
multicall.add(2,3)
|
||||
multicall.pow(6,8)
|
||||
multicall.div(127,42)
|
||||
add_result, pow_result, div_result = multicall()
|
||||
self.assertEqual(add_result, 2+3)
|
||||
self.assertEqual(pow_result, 6**8)
|
||||
self.assertEqual(div_result, 127//42)
|
||||
|
||||
|
||||
def test_main():
|
||||
xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
|
||||
BinaryTestCase, FaultTestCase]
|
||||
|
||||
# The test cases against a SimpleXMLRPCServer raise a socket error
|
||||
# 10035 (WSAEWOULDBLOCK) in the server thread handle_request call when
|
||||
# run on Windows. This only happens on the first test to run, but it
|
||||
# fails every time and so these tests are skipped on win32 platforms.
|
||||
if sys.platform != 'win32':
|
||||
xmlrpc_tests.append(HTTPTestCase)
|
||||
|
||||
test_support.run_unittest(*xmlrpc_tests)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
||||
|
|
Loading…
Reference in New Issue