370 lines
12 KiB
Python
370 lines
12 KiB
Python
"""protocol (David Scherer <dscherer@cmu.edu>)
|
|
|
|
This module implements a simple RPC or "distributed object" protocol.
|
|
I am probably the 100,000th person to write this in Python, but, hey,
|
|
it was fun.
|
|
|
|
Contents:
|
|
|
|
connectionLost is an exception that will be thrown by functions in
|
|
the protocol module or calls to remote methods that fail because
|
|
the remote program has closed the socket or because no connection
|
|
could be established in the first place.
|
|
|
|
Server( port=None, connection_hook=None ) creates a server on a
|
|
well-known port, to which clients can connect. When a client
|
|
connects, a Connection is created for it. If connection_hook
|
|
is defined, then connection_hook( socket.getpeername() ) is called
|
|
before a Connection is created, and if it returns false then the
|
|
connection is refused. connection_hook must be prepared to be
|
|
called from any thread.
|
|
|
|
Client( ip='127.0.0.1', port=None ) returns a Connection to a Server
|
|
object at a well-known address and port.
|
|
|
|
Connection( socket ) creates an RPC connection on an arbitrary socket,
|
|
which must already be connected to another program. You do not
|
|
need to use this directly if you are using Client() or Server().
|
|
|
|
publish( name, connect_function ) provides an object with the
|
|
specified name to some or all Connections. When another program
|
|
calls Connection.getobject() with the specified name, the
|
|
specified connect_function is called with the arguments
|
|
|
|
connect_function( conn, addr )
|
|
|
|
where conn is the Connection object to the requesting client and
|
|
addr is the address returned by socket.getpeername(). If that
|
|
function returns an object, that object becomes accessible to
|
|
the caller. If it returns None, the caller's request fails.
|
|
|
|
Connection objects:
|
|
|
|
.close() refuses additional RPC messages from the peer, and notifies
|
|
the peer that the connection has been closed. All pending remote
|
|
method calls in either program will fail with a connectionLost
|
|
exception. Further remote method calls on this connection will
|
|
also result in errors.
|
|
|
|
.getobject(name) returns a proxy for the remote object with the
|
|
specified name, if it exists and the peer permits us access.
|
|
Otherwise, it returns None. It may throw a connectionLost
|
|
exception. The returned proxy supports basic attribute access
|
|
and method calls, and its methods have an extra attribute,
|
|
.void, which is a function that has the same effect but always
|
|
returns None. This last capability is provided as a performance
|
|
hack: object.method.void(params) can return without waiting for
|
|
the remote process to respond, but object.method(params) needs
|
|
to wait for a return value or exception.
|
|
|
|
.rpc_loop(block=0) processes *incoming* messages for this connection.
|
|
If block=1, it continues processing until an exception or return
|
|
value is received, which is normally forever. Otherwise it
|
|
returns when all currently pending messages have been delivered.
|
|
It may throw a connectionLost exception.
|
|
|
|
.set_close_hook(f) specifies a function to be called when the remote
|
|
object closes the connection during a call to rpc_loop(). This
|
|
is a good way for servers to be notified when clients disconnect.
|
|
|
|
.set_shutdown_hook(f) specifies a function called *immediately* when
|
|
the receive loop detects that the connection has been lost. The
|
|
provided function must be prepared to run in any thread.
|
|
|
|
Server objects:
|
|
|
|
.rpc_loop() processes incoming messages on all connections, and
|
|
returns when all pending messages have been processed. It will
|
|
*not* throw connectionLost exceptions; the
|
|
Connection.set_close_hook() mechanism is much better for servers.
|
|
"""
|
|
|
|
import sys, os, string, types
|
|
import socket
|
|
from threading import Thread
|
|
from Queue import Queue, Empty
|
|
from cPickle import Pickler, Unpickler, PicklingError
|
|
|
|
class connectionLost:
|
|
def __init__(self, what=""): self.what = what
|
|
def __repr__(self): return self.what
|
|
def __str__(self): return self.what
|
|
|
|
def getmethods(cls):
|
|
"Returns a list of the names of the methods of a class."
|
|
methods = []
|
|
for b in cls.__bases__:
|
|
methods = methods + getmethods(b)
|
|
d = cls.__dict__
|
|
for k in d.keys():
|
|
if type(d[k])==types.FunctionType:
|
|
methods.append(k)
|
|
return methods
|
|
|
|
class methodproxy:
|
|
"Proxy for a method of a remote object."
|
|
def __init__(self, classp, name):
|
|
self.classp=classp
|
|
self.name=name
|
|
self.client = classp.client
|
|
def __call__(self, *args, **keywords):
|
|
return self.client.call( 'm', self.classp.name, self.name, args, keywords )
|
|
|
|
def void(self, *args, **keywords):
|
|
self.client.call_void( 'm', self.classp.name,self.name,args,keywords)
|
|
|
|
class classproxy:
|
|
"Proxy for a remote object."
|
|
def __init__(self, client, name, methods):
|
|
self.__dict__['client'] = client
|
|
self.__dict__['name'] = name
|
|
|
|
for m in methods:
|
|
prox = methodproxy( self, m )
|
|
self.__dict__[m] = prox
|
|
|
|
def __getattr__(self, attr):
|
|
return self.client.call( 'g', self.name, attr )
|
|
|
|
def __setattr__(self, attr, value):
|
|
self.client.call_void( 's', self.name, attr, value )
|
|
|
|
local_connect = {}
|
|
def publish(name, connect_function):
|
|
local_connect[name]=connect_function
|
|
|
|
class socketFile:
|
|
"File emulator based on a socket. Provides only blocking semantics for now."
|
|
|
|
def __init__(self, socket):
|
|
self.socket = socket
|
|
self.buffer = ''
|
|
|
|
def _recv(self,bytes):
|
|
try:
|
|
r=self.socket.recv(bytes)
|
|
except:
|
|
raise connectionLost()
|
|
if not r:
|
|
raise connectionLost()
|
|
return r
|
|
|
|
def write(self, string):
|
|
try:
|
|
self.socket.send( string )
|
|
except:
|
|
raise connectionLost()
|
|
|
|
def read(self,bytes):
|
|
x = bytes-len(self.buffer)
|
|
while x>0:
|
|
self.buffer=self.buffer+self._recv(x)
|
|
x = bytes-len(self.buffer)
|
|
s = self.buffer[:bytes]
|
|
self.buffer=self.buffer[bytes:]
|
|
return s
|
|
|
|
def readline(self):
|
|
while 1:
|
|
f = string.find(self.buffer,'\n')
|
|
if f>=0:
|
|
s = self.buffer[:f+1]
|
|
self.buffer=self.buffer[f+1:]
|
|
return s
|
|
self.buffer = self.buffer + self._recv(1024)
|
|
|
|
|
|
class Connection (Thread):
|
|
debug = 0
|
|
def __init__(self, socket):
|
|
self.local_objects = {}
|
|
self.socket = socket
|
|
self.name = socket.getpeername()
|
|
self.socketfile = socketFile(socket)
|
|
self.queue = Queue(-1)
|
|
self.refuse_messages = 0
|
|
self.cmds = { 'm': self.r_meth,
|
|
'g': self.r_get,
|
|
's': self.r_set,
|
|
'o': self.r_geto,
|
|
'e': self.r_exc,
|
|
#'r' handled by rpc_loop
|
|
}
|
|
|
|
Thread.__init__(self)
|
|
self.setDaemon(1)
|
|
self.start()
|
|
|
|
def getobject(self, name):
|
|
methods = self.call( 'o', name )
|
|
if methods is None: return None
|
|
return classproxy(self, name, methods)
|
|
|
|
# close_hook is called from rpc_loop(), like a normal remote method
|
|
# invocation
|
|
def set_close_hook(self,hook): self.close_hook = hook
|
|
|
|
# shutdown_hook is called directly from the run() thread, and needs
|
|
# to be "thread safe"
|
|
def set_shutdown_hook(self,hook): self.shutdown_hook = hook
|
|
|
|
close_hook = None
|
|
shutdown_hook = None
|
|
|
|
def close(self):
|
|
self._shutdown()
|
|
self.refuse_messages = 1
|
|
|
|
def call(self, c, *args):
|
|
self.send( (c, args, 1 ) )
|
|
return self.rpc_loop( block = 1 )
|
|
|
|
def call_void(self, c, *args):
|
|
try:
|
|
self.send( (c, args, 0 ) )
|
|
except:
|
|
pass
|
|
|
|
# the following methods handle individual RPC calls:
|
|
|
|
def r_geto(self, obj):
|
|
c = local_connect.get(obj)
|
|
if not c: return None
|
|
o = c(self, self.name)
|
|
if not o: return None
|
|
self.local_objects[obj] = o
|
|
return getmethods(o.__class__)
|
|
|
|
def r_meth(self, obj, name, args, keywords):
|
|
return apply( getattr(self.local_objects[obj],name), args, keywords)
|
|
|
|
def r_get(self, obj, name):
|
|
return getattr(self.local_objects[obj],name)
|
|
|
|
def r_set(self, obj, name, value):
|
|
setattr(self.local_objects[obj],name,value)
|
|
|
|
def r_exc(self, e, v):
|
|
raise e, v
|
|
|
|
def rpc_exec(self, cmd, arg, ret):
|
|
if self.refuse_messages: return
|
|
if self.debug: print cmd,arg,ret
|
|
if ret:
|
|
try:
|
|
r=apply(self.cmds.get(cmd), arg)
|
|
self.send( ('r', r, 0) )
|
|
except:
|
|
try:
|
|
self.send( ('e', sys.exc_info()[:2], 0) )
|
|
except PicklingError:
|
|
self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) )
|
|
else:
|
|
# we cannot report exceptions to the caller, so
|
|
# we report them in this process.
|
|
r=apply(self.cmds.get(cmd), arg)
|
|
|
|
# the following methods implement the RPC and message loops:
|
|
|
|
def rpc_loop(self, block=0):
|
|
if self.refuse_messages: raise connectionLost('(already closed)')
|
|
try:
|
|
while 1:
|
|
try:
|
|
cmd, arg, ret = self.queue.get( block )
|
|
except Empty:
|
|
return None
|
|
if cmd=='r': return arg
|
|
self.rpc_exec(cmd,arg,ret)
|
|
except connectionLost:
|
|
if self.close_hook:
|
|
self.close_hook()
|
|
self.close_hook = None
|
|
raise
|
|
|
|
def run(self):
|
|
try:
|
|
while 1:
|
|
data = self.recv()
|
|
self.queue.put( data )
|
|
except:
|
|
self.queue.put( ('e', sys.exc_info()[:2], 0) )
|
|
|
|
# The following send raw pickled data to the peer
|
|
|
|
def send(self, data):
|
|
try:
|
|
Pickler(self.socketfile,1).dump( data )
|
|
except connectionLost:
|
|
self._shutdown()
|
|
if self.shutdown_hook: self.shutdown_hook()
|
|
raise
|
|
|
|
def recv(self):
|
|
try:
|
|
return Unpickler(self.socketfile).load()
|
|
except connectionLost:
|
|
self._shutdown()
|
|
if self.shutdown_hook: self.shutdown_hook()
|
|
raise
|
|
except:
|
|
raise
|
|
|
|
def _shutdown(self):
|
|
try:
|
|
self.socket.shutdown(1)
|
|
self.socket.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
class Server (Thread):
|
|
default_port = 0x1D1E # "IDlE"
|
|
|
|
def __init__(self, port=None, connection_hook=None):
|
|
self.connections = []
|
|
self.port = port or self.default_port
|
|
self.connection_hook = connection_hook
|
|
|
|
try:
|
|
self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.bind(('', self.port))
|
|
s.listen(3)
|
|
except:
|
|
raise connectionLost
|
|
|
|
Thread.__init__(self)
|
|
self.setDaemon(1)
|
|
self.start()
|
|
|
|
def run(self):
|
|
s = self.wellknown
|
|
while 1:
|
|
conn, addr = s.accept()
|
|
if self.connection_hook and not self.connection_hook(addr):
|
|
try:
|
|
conn.shutdown(1)
|
|
except:
|
|
pass
|
|
continue
|
|
self.connections.append( Connection(conn) )
|
|
|
|
def rpc_loop(self):
|
|
cns = self.connections[:]
|
|
for c in cns:
|
|
try:
|
|
c.rpc_loop(block = 0)
|
|
except connectionLost:
|
|
if c in self.connections:
|
|
self.connections.remove(c)
|
|
|
|
def Client(ip='127.0.0.1', port=None):
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.connect((ip,port or Server.default_port))
|
|
except socket.error, what:
|
|
raise connectionLost(str(what))
|
|
except:
|
|
raise connectionLost()
|
|
return Connection(s)
|