Sam's latest versions

This commit is contained in:
Guido van Rossum 1999-06-08 13:20:05 +00:00
parent cf09a3924f
commit a8d0f4fd2d
2 changed files with 109 additions and 87 deletions

View File

@ -63,10 +63,7 @@ class async_chat (asyncore.dispatcher):
asyncore.dispatcher.__init__ (self, conn) asyncore.dispatcher.__init__ (self, conn)
def set_terminator (self, term): def set_terminator (self, term):
"Set the input delimiter. Can be a fixed string of any length, or None" "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
if term is None:
self.terminator = ''
else:
self.terminator = term self.terminator = term
def get_terminator (self): def get_terminator (self):
@ -82,8 +79,7 @@ class async_chat (asyncore.dispatcher):
try: try:
data = self.recv (self.ac_in_buffer_size) data = self.recv (self.ac_in_buffer_size)
except socket.error, why: except socket.error, why:
import sys self.handle_error()
self.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback)
return return
self.ac_in_buffer = self.ac_in_buffer + data self.ac_in_buffer = self.ac_in_buffer + data
@ -94,17 +90,33 @@ class async_chat (asyncore.dispatcher):
# combos with a single recv(1024). # combos with a single recv(1024).
while self.ac_in_buffer: while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
terminator = self.get_terminator() terminator = self.get_terminator()
terminator_len = len(terminator) if terminator is None:
# 4 cases: # no terminator, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
elif type(terminator) == type(0):
# numeric terminator
n = terminator
if lb < n:
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
self.terminator = self.terminator - lb
else:
self.collect_incoming_data (self.ac_in_buffer[:n])
self.ac_in_buffer = self.ac_in_buffer[n:]
self.terminator = 0
self.found_terminator()
else:
# 3 cases:
# 1) end of buffer matches terminator exactly: # 1) end of buffer matches terminator exactly:
# collect data, transition # collect data, transition
# 2) end of buffer matches some prefix: # 2) end of buffer matches some prefix:
# collect data to the prefix # collect data to the prefix
# 3) end of buffer does not match any prefix: # 3) end of buffer does not match any prefix:
# collect data # collect data
# 4) no terminator, just collect the data terminator_len = len(terminator)
if terminator:
index = string.find (self.ac_in_buffer, terminator) index = string.find (self.ac_in_buffer, terminator)
if index != -1: if index != -1:
# we found the terminator # we found the terminator
@ -116,6 +128,7 @@ class async_chat (asyncore.dispatcher):
# check for a prefix of the terminator # check for a prefix of the terminator
index = find_prefix_at_end (self.ac_in_buffer, terminator) index = find_prefix_at_end (self.ac_in_buffer, terminator)
if index: if index:
if index != lb:
# we found a prefix, collect up to the prefix # we found a prefix, collect up to the prefix
self.collect_incoming_data (self.ac_in_buffer[:-index]) self.collect_incoming_data (self.ac_in_buffer[:-index])
self.ac_in_buffer = self.ac_in_buffer[-index:] self.ac_in_buffer = self.ac_in_buffer[-index:]
@ -124,10 +137,6 @@ class async_chat (asyncore.dispatcher):
# no prefix, collect it all # no prefix, collect it all
self.collect_incoming_data (self.ac_in_buffer) self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = '' self.ac_in_buffer = ''
else:
# no terminator, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
def handle_write (self): def handle_write (self):
self.initiate_send () self.initiate_send ()
@ -144,17 +153,27 @@ class async_chat (asyncore.dispatcher):
self.initiate_send() self.initiate_send()
def readable (self): def readable (self):
"predicate for inclusion in the readable for select()"
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
def writable (self): def writable (self):
return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) "predicate for inclusion in the writable for select()"
# return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
# this is about twice as fast, though not as clear.
return not (
(self.ac_out_buffer is '') and
self.producer_fifo.is_empty() and
self.connected
)
def close_when_done (self): def close_when_done (self):
"automatically close this channel once the outgoing queue is empty"
self.producer_fifo.push (None) self.producer_fifo.push (None)
# refill the outgoing buffer by calling the more() method # refill the outgoing buffer by calling the more() method
# of the first producer in the queue # of the first producer in the queue
def refill_buffer (self): def refill_buffer (self):
_string_type = type('')
while 1: while 1:
if len(self.producer_fifo): if len(self.producer_fifo):
p = self.producer_fifo.first() p = self.producer_fifo.first()
@ -165,6 +184,10 @@ class async_chat (asyncore.dispatcher):
self.producer_fifo.pop() self.producer_fifo.pop()
self.close() self.close()
return return
elif type(p) is _string_type:
self.producer_fifo.pop()
self.ac_out_buffer = self.ac_out_buffer + p
return
data = p.more() data = p.more()
if data: if data:
self.ac_out_buffer = self.ac_out_buffer + data self.ac_out_buffer = self.ac_out_buffer + data
@ -177,15 +200,20 @@ class async_chat (asyncore.dispatcher):
def initiate_send (self): def initiate_send (self):
obs = self.ac_out_buffer_size obs = self.ac_out_buffer_size
# try to refill the buffer # try to refill the buffer
if (not self._push_mode) and (len (self.ac_out_buffer) < obs): if (len (self.ac_out_buffer) < obs):
self.refill_buffer() self.refill_buffer()
if self.ac_out_buffer and self.connected: if self.ac_out_buffer and self.connected:
# try to send the buffer # try to send the buffer
try:
num_sent = self.send (self.ac_out_buffer[:obs]) num_sent = self.send (self.ac_out_buffer[:obs])
if num_sent: if num_sent:
self.ac_out_buffer = self.ac_out_buffer[num_sent:] self.ac_out_buffer = self.ac_out_buffer[num_sent:]
except socket.error, why:
self.handle_error()
return
def discard_buffers (self): def discard_buffers (self):
# Emergencies only! # Emergencies only!
self.ac_in_buffer = '' self.ac_in_buffer = ''
@ -193,17 +221,8 @@ class async_chat (asyncore.dispatcher):
while self.producer_fifo: while self.producer_fifo:
self.producer_fifo.pop() self.producer_fifo.pop()
# ==================================================
# support for push mode.
# ==================================================
_push_mode = 0
def push_mode (self, boolean):
self._push_mode = boolean
def writable_push (self):
return self.connected and len(self.ac_out_buffer)
class simple_producer: class simple_producer:
def __init__ (self, data, buffer_size=512): def __init__ (self, data, buffer_size=512):
self.data = data self.data = data
self.buffer_size = buffer_size self.buffer_size = buffer_size
@ -228,6 +247,9 @@ class fifo:
def __len__ (self): def __len__ (self):
return len(self.list) return len(self.list)
def is_empty (self):
return self.list == []
def first (self): def first (self):
return self.list[0] return self.list[0]

View File

@ -37,38 +37,33 @@ if os.name == 'nt':
EALREADY = 10037 EALREADY = 10037
ECONNRESET = 10054 ECONNRESET = 10054
ENOTCONN = 10057 ENOTCONN = 10057
ESHUTDOWN = 10058
else: else:
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
socket_map = {} socket_map = {}
def poll (timeout=0.0, ignore_expt=1): def poll (timeout=0.0):
if socket_map: if socket_map:
sockets = socket_map.keys() r = []; w = []; e = []
r = filter (lambda x: x.readable(), sockets) for s in socket_map.keys():
w = filter (lambda x: x.writable(), sockets) if s.readable():
if ignore_expt: r.append (s)
e = [] if s.writable():
else: w.append (s)
e = sockets[:]
(r,w,e) = select.select (r,w,e, timeout) (r,w,e) = select.select (r,w,e, timeout)
for x in e:
try:
x.handle_expt_event()
except:
x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback)
for x in r: for x in r:
try: try:
x.handle_read_event() x.handle_read_event()
except: except:
x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) x.handle_error()
for x in w: for x in w:
try: try:
x.handle_write_event() x.handle_write_event()
except: except:
x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) x.handle_error()
def poll2 (timeout=0.0): def poll2 (timeout=0.0):
import poll import poll
@ -88,7 +83,6 @@ def poll2 (timeout=0.0):
if flags: if flags:
l.append (fd, flags) l.append (fd, flags)
r = poll.poll (l, timeout) r = poll.poll (l, timeout)
print r
for fd, flags in r: for fd, flags in r:
s = fd_map[fd] s = fd_map[fd]
try: try:
@ -99,7 +93,7 @@ def poll2 (timeout=0.0):
if (flags & poll.POLLERR): if (flags & poll.POLLERR):
s.handle_expt_event() s.handle_expt_event()
except: except:
apply (s.handle_error, sys.exc_info()) s.handle_error()
def loop (timeout=30.0, use_poll=0): def loop (timeout=30.0, use_poll=0):
@ -149,11 +143,13 @@ class dispatcher:
return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar) return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
def add_channel (self): def add_channel (self):
if __debug__:
self.log ('adding channel %s' % self) self.log ('adding channel %s' % self)
socket_map [self] = 1 socket_map [self] = 1
def del_channel (self): def del_channel (self):
if socket_map.has_key (self): if socket_map.has_key (self):
if __debug__:
self.log ('closing channel %d:%s' % (self.fileno(), self)) self.log ('closing channel %d:%s' % (self.fileno(), self))
del socket_map [self] del socket_map [self]
@ -164,7 +160,8 @@ class dispatcher:
self.add_channel() self.add_channel()
def set_socket (self, socket): def set_socket (self, socket):
self.socket = socket # This is done so we can be called safely from __init__
self.__dict__['socket'] = socket
self.add_channel() self.add_channel()
def set_reuse_addr (self): def set_reuse_addr (self):
@ -210,6 +207,7 @@ class dispatcher:
return self.socket.bind (addr) return self.socket.bind (addr)
def connect (self, address): def connect (self, address):
self.connected = 0
try: try:
self.socket.connect (address) self.socket.connect (address)
except socket.error, why: except socket.error, why:
@ -253,7 +251,7 @@ class dispatcher:
return data return data
except socket.error, why: except socket.error, why:
# winsock sometimes throws ENOTCONN # winsock sometimes throws ENOTCONN
if why[0] in [ECONNRESET, ENOTCONN]: if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
self.handle_close() self.handle_close()
return '' return ''
else: else:
@ -262,15 +260,12 @@ class dispatcher:
def close (self): def close (self):
self.del_channel() self.del_channel()
self.socket.close() self.socket.close()
self.connected = 0
# cheap inheritance, used to pass all other attribute # cheap inheritance, used to pass all other attribute
# references to the underlying socket object. # references to the underlying socket object.
# NOTE: this may be removed soon for performance reasons.
def __getattr__ (self, attr): def __getattr__ (self, attr):
if attr != 'socket':
return getattr (self.socket, attr) return getattr (self.socket, attr)
else:
raise AttributeError, attr
def log (self, message): def log (self, message):
print 'log:', message print 'log:', message
@ -299,9 +294,8 @@ class dispatcher:
def handle_expt_event (self): def handle_expt_event (self):
self.handle_expt() self.handle_expt()
def handle_error (self, *info): def handle_error (self):
(t,v,tb) = info (file,fun,line), t, v, tbinfo = compact_traceback()
(file,fun,line), tbinfo = compact_traceback (t,v,tb)
# sometimes a user repr method will crash. # sometimes a user repr method will crash.
try: try:
@ -312,33 +306,35 @@ class dispatcher:
print ( print (
'uncaptured python exception, closing channel %s (%s:%s %s)' % ( 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr, self_repr,
str(t), t,
str(v), v,
tbinfo tbinfo
) )
) )
del t,v,tb
self.close() self.close()
def handle_expt (self): def handle_expt (self):
if __debug__:
self.log ('unhandled exception') self.log ('unhandled exception')
def handle_read (self): def handle_read (self):
if __debug__:
self.log ('unhandled read event') self.log ('unhandled read event')
def handle_write (self): def handle_write (self):
if __debug__:
self.log ('unhandled write event') self.log ('unhandled write event')
def handle_connect (self): def handle_connect (self):
if __debug__:
self.log ('unhandled connect event') self.log ('unhandled connect event')
def handle_oob (self):
self.log ('unhandled out-of-band event')
def handle_accept (self): def handle_accept (self):
if __debug__:
self.log ('unhandled accept event') self.log ('unhandled accept event')
def handle_close (self): def handle_close (self):
if __debug__:
self.log ('unhandled close event') self.log ('unhandled close event')
self.close() self.close()
@ -373,7 +369,8 @@ class dispatcher_with_send (dispatcher):
# used for debugging. # used for debugging.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def compact_traceback (t,v,tb): def compact_traceback ():
t,v,tb = sys.exc_info()
tbinfo = [] tbinfo = []
while 1: while 1:
tbinfo.append ( tbinfo.append (
@ -385,6 +382,9 @@ def compact_traceback (t,v,tb):
if not tb: if not tb:
break break
# just to be safe
del tb
file, function, line = tbinfo[-1] file, function, line = tbinfo[-1]
info = '[' + string.join ( info = '[' + string.join (
map ( map (
@ -393,7 +393,7 @@ def compact_traceback (t,v,tb):
), ),
'] [' '] ['
) + ']' ) + ']'
return (file, function, line), info return (file, function, line), t, v, info
def close_all (): def close_all ():
global socket_map global socket_map
@ -450,4 +450,4 @@ if os.name == 'posix':
def set_file (self, fd): def set_file (self, fd):
self.socket = file_wrapper (fd) self.socket = file_wrapper (fd)
self.add_channel() self.add_channel()
#not really