Whitespace standardization.

This commit is contained in:
Tim Peters 2001-01-14 18:09:23 +00:00
parent 538f05c94d
commit 146965abf2
6 changed files with 484 additions and 485 deletions

View File

@ -702,7 +702,7 @@ class Aifc_write:
if len(self._markers) == 0:
return None
return self._markers
def tell(self):
return self._nframeswritten

View File

@ -2,13 +2,13 @@
Instead of
import dbm
d = dbm.open(file, 'w', 0666)
import dbm
d = dbm.open(file, 'w', 0666)
use
import anydbm
d = anydbm.open(file, 'w')
import anydbm
d = anydbm.open(file, 'w')
The returned object is a dbhash, gdbm, dbm or dumbdbm object,
dependent on the type of database being opened (determined by whichdb
@ -19,14 +19,14 @@ order).
It has the following interface (key and data are strings):
d[key] = data # store data at key (may override data at
# existing key)
data = d[key] # retrieve data at key (raise KeyError if no
# such key)
del d[key] # delete data stored at key (raises KeyError
# if no such key)
flag = d.has_key(key) # true if the key exists
list = d.keys() # return a list of all existing keys (slow!)
d[key] = data # store data at key (may override data at
# existing key)
data = d[key] # retrieve data at key (raise KeyError if no
# such key)
del d[key] # delete data stored at key (raises KeyError
# if no such key)
flag = d.has_key(key) # true if the key exists
list = d.keys() # return a list of all existing keys (slow!)
Future versions may change the order in which implementations are
tested for existence, add interfaces to other dbm-like
@ -43,44 +43,44 @@ only if it doesn't exist; and 'n' always creates a new database.
"""
try:
class error(Exception):
pass
class error(Exception):
pass
except:
error = "anydbm.error"
error = "anydbm.error"
_names = ['dbhash', 'gdbm', 'dbm', 'dumbdbm']
_errors = [error]
_defaultmod = None
for _name in _names:
try:
_mod = __import__(_name)
except ImportError:
continue
if not _defaultmod:
_defaultmod = _mod
_errors.append(_mod.error)
try:
_mod = __import__(_name)
except ImportError:
continue
if not _defaultmod:
_defaultmod = _mod
_errors.append(_mod.error)
if not _defaultmod:
raise ImportError, "no dbm clone found; tried %s" % _names
raise ImportError, "no dbm clone found; tried %s" % _names
error = tuple(_errors)
def open(file, flag = 'r', mode = 0666):
# guess the type of an existing database
from whichdb import whichdb
result=whichdb(file)
if result is None:
# db doesn't exist
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new
# flag was used so use default type
mod = _defaultmod
else:
raise error, "need 'c' or 'n' flag to open new db"
elif result == "":
# db type cannot be determined
raise error, "db type could not be determined"
else:
mod = __import__(result)
return mod.open(file, flag, mode)
# guess the type of an existing database
from whichdb import whichdb
result=whichdb(file)
if result is None:
# db doesn't exist
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new
# flag was used so use default type
mod = _defaultmod
else:
raise error, "need 'c' or 'n' flag to open new db"
elif result == "":
# db type cannot be determined
raise error, "db type could not be determined"
else:
mod = __import__(result)
return mod.open(file, flag, mode)

View File

@ -1,12 +1,12 @@
# -*- Mode: Python; tab-width: 4 -*-
# Id: asynchat.py,v 2.25 1999/11/18 11:01:08 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# Id: asynchat.py,v 2.25 1999/11/18 11:01:08 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
@ -15,7 +15,7 @@
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
@ -51,224 +51,224 @@ import asyncore
import string
class async_chat (asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
# these are overridable defaults
# these are overridable defaults
ac_in_buffer_size = 4096
ac_out_buffer_size = 4096
ac_in_buffer_size = 4096
ac_out_buffer_size = 4096
def __init__ (self, conn=None):
self.ac_in_buffer = ''
self.ac_out_buffer = ''
self.producer_fifo = fifo()
asyncore.dispatcher.__init__ (self, conn)
def __init__ (self, conn=None):
self.ac_in_buffer = ''
self.ac_out_buffer = ''
self.producer_fifo = fifo()
asyncore.dispatcher.__init__ (self, conn)
def set_terminator (self, term):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
self.terminator = term
def set_terminator (self, term):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
self.terminator = term
def get_terminator (self):
return self.terminator
def get_terminator (self):
return self.terminator
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
def handle_read (self):
def handle_read (self):
try:
data = self.recv (self.ac_in_buffer_size)
except socket.error, why:
self.handle_error()
return
try:
data = self.recv (self.ac_in_buffer_size)
except socket.error, why:
self.handle_error()
return
self.ac_in_buffer = self.ac_in_buffer + data
self.ac_in_buffer = self.ac_in_buffer + data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(1024).
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(1024).
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
terminator = self.get_terminator()
if terminator is None:
# no terminator, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
elif type(terminator) == type(0):
# numeric terminator
n = terminator
if lb < n:
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
self.terminator = self.terminator - lb
else:
self.collect_incoming_data (self.ac_in_buffer[:n])
self.ac_in_buffer = self.ac_in_buffer[n:]
self.terminator = 0
self.found_terminator()
else:
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
terminator_len = len(terminator)
index = string.find (self.ac_in_buffer, terminator)
if index != -1:
# we found the terminator
if index > 0:
# don't bother reporting the empty string (source of subtle bugs)
self.collect_incoming_data (self.ac_in_buffer[:index])
self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
# This does the Right Thing if the terminator is changed here.
self.found_terminator()
else:
# check for a prefix of the terminator
index = find_prefix_at_end (self.ac_in_buffer, terminator)
if index:
if index != lb:
# we found a prefix, collect up to the prefix
self.collect_incoming_data (self.ac_in_buffer[:-index])
self.ac_in_buffer = self.ac_in_buffer[-index:]
break
else:
# no prefix, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
terminator = self.get_terminator()
if terminator is None:
# no terminator, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
elif type(terminator) == type(0):
# numeric terminator
n = terminator
if lb < n:
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
self.terminator = self.terminator - lb
else:
self.collect_incoming_data (self.ac_in_buffer[:n])
self.ac_in_buffer = self.ac_in_buffer[n:]
self.terminator = 0
self.found_terminator()
else:
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
terminator_len = len(terminator)
index = string.find (self.ac_in_buffer, terminator)
if index != -1:
# we found the terminator
if index > 0:
# don't bother reporting the empty string (source of subtle bugs)
self.collect_incoming_data (self.ac_in_buffer[:index])
self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
# This does the Right Thing if the terminator is changed here.
self.found_terminator()
else:
# check for a prefix of the terminator
index = find_prefix_at_end (self.ac_in_buffer, terminator)
if index:
if index != lb:
# we found a prefix, collect up to the prefix
self.collect_incoming_data (self.ac_in_buffer[:-index])
self.ac_in_buffer = self.ac_in_buffer[-index:]
break
else:
# no prefix, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
def handle_write (self):
self.initiate_send ()
def handle_close (self):
self.close()
def handle_write (self):
self.initiate_send ()
def push (self, data):
self.producer_fifo.push (simple_producer (data))
self.initiate_send()
def handle_close (self):
self.close()
def push_with_producer (self, producer):
self.producer_fifo.push (producer)
self.initiate_send()
def push (self, data):
self.producer_fifo.push (simple_producer (data))
self.initiate_send()
def readable (self):
"predicate for inclusion in the readable for select()"
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
def push_with_producer (self, producer):
self.producer_fifo.push (producer)
self.initiate_send()
def writable (self):
"predicate for inclusion in the writable for select()"
# return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
# this is about twice as fast, though not as clear.
return not (
(self.ac_out_buffer is '') and
self.producer_fifo.is_empty() and
self.connected
)
def readable (self):
"predicate for inclusion in the readable for select()"
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
def close_when_done (self):
"automatically close this channel once the outgoing queue is empty"
self.producer_fifo.push (None)
def writable (self):
"predicate for inclusion in the writable for select()"
# return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
# this is about twice as fast, though not as clear.
return not (
(self.ac_out_buffer is '') and
self.producer_fifo.is_empty() and
self.connected
)
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
def refill_buffer (self):
_string_type = type('')
while 1:
if len(self.producer_fifo):
p = self.producer_fifo.first()
# a 'None' in the producer fifo is a sentinel,
# telling us to close the channel.
if p is None:
if not self.ac_out_buffer:
self.producer_fifo.pop()
self.close()
return
elif type(p) is _string_type:
self.producer_fifo.pop()
self.ac_out_buffer = self.ac_out_buffer + p
return
data = p.more()
if data:
self.ac_out_buffer = self.ac_out_buffer + data
return
else:
self.producer_fifo.pop()
else:
return
def close_when_done (self):
"automatically close this channel once the outgoing queue is empty"
self.producer_fifo.push (None)
def initiate_send (self):
obs = self.ac_out_buffer_size
# try to refill the buffer
if (len (self.ac_out_buffer) < obs):
self.refill_buffer()
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
def refill_buffer (self):
_string_type = type('')
while 1:
if len(self.producer_fifo):
p = self.producer_fifo.first()
# a 'None' in the producer fifo is a sentinel,
# telling us to close the channel.
if p is None:
if not self.ac_out_buffer:
self.producer_fifo.pop()
self.close()
return
elif type(p) is _string_type:
self.producer_fifo.pop()
self.ac_out_buffer = self.ac_out_buffer + p
return
data = p.more()
if data:
self.ac_out_buffer = self.ac_out_buffer + data
return
else:
self.producer_fifo.pop()
else:
return
if self.ac_out_buffer and self.connected:
# try to send the buffer
try:
num_sent = self.send (self.ac_out_buffer[:obs])
if num_sent:
self.ac_out_buffer = self.ac_out_buffer[num_sent:]
def initiate_send (self):
obs = self.ac_out_buffer_size
# try to refill the buffer
if (len (self.ac_out_buffer) < obs):
self.refill_buffer()
except socket.error, why:
self.handle_error()
return
if self.ac_out_buffer and self.connected:
# try to send the buffer
try:
num_sent = self.send (self.ac_out_buffer[:obs])
if num_sent:
self.ac_out_buffer = self.ac_out_buffer[num_sent:]
def discard_buffers (self):
# Emergencies only!
self.ac_in_buffer = ''
self.ac_out_buffer = ''
while self.producer_fifo:
self.producer_fifo.pop()
except socket.error, why:
self.handle_error()
return
def discard_buffers (self):
# Emergencies only!
self.ac_in_buffer = ''
self.ac_out_buffer = ''
while self.producer_fifo:
self.producer_fifo.pop()
class simple_producer:
def __init__ (self, data, buffer_size=512):
self.data = data
self.buffer_size = buffer_size
def __init__ (self, data, buffer_size=512):
self.data = data
self.buffer_size = buffer_size
def more (self):
if len (self.data) > self.buffer_size:
result = self.data[:self.buffer_size]
self.data = self.data[self.buffer_size:]
return result
else:
result = self.data
self.data = ''
return result
def more (self):
if len (self.data) > self.buffer_size:
result = self.data[:self.buffer_size]
self.data = self.data[self.buffer_size:]
return result
else:
result = self.data
self.data = ''
return result
class fifo:
def __init__ (self, list=None):
if not list:
self.list = []
else:
self.list = list
def __len__ (self):
return len(self.list)
def __init__ (self, list=None):
if not list:
self.list = []
else:
self.list = list
def is_empty (self):
return self.list == []
def __len__ (self):
return len(self.list)
def first (self):
return self.list[0]
def is_empty (self):
return self.list == []
def push (self, data):
self.list.append (data)
def first (self):
return self.list[0]
def pop (self):
if self.list:
result = self.list[0]
del self.list[0]
return (1, result)
else:
return (0, None)
def push (self, data):
self.list.append (data)
def pop (self):
if self.list:
result = self.list[0]
del self.list[0]
return (1, result)
else:
return (0, None)
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
@ -281,13 +281,13 @@ class fifo:
# this could maybe be made faster with a computed regex?
##def find_prefix_at_end (haystack, needle):
## nl = len(needle)
## result = 0
## for i in range (1,nl):
## if haystack[-(nl-i):] == needle[:(nl-i)]:
## result = nl-i
## break
## return result
## nl = len(needle)
## result = 0
## for i in range (1,nl):
## if haystack[-(nl-i):] == needle[:(nl-i)]:
## result = nl-i
## break
## return result
# yes, this is about twice as fast, but still seems
# to be negligible CPU. The previous version could do about 290
@ -298,21 +298,21 @@ import regex
prefix_cache = {}
def prefix_regex (needle):
if prefix_cache.has_key (needle):
return prefix_cache[needle]
else:
reg = needle[-1]
for i in range(1,len(needle)):
reg = '%c\(%s\)?' % (needle[-(i+1)], reg)
reg = regex.compile (reg+'$')
prefix_cache[needle] = reg, len(needle)
return reg, len(needle)
if prefix_cache.has_key (needle):
return prefix_cache[needle]
else:
reg = needle[-1]
for i in range(1,len(needle)):
reg = '%c\(%s\)?' % (needle[-(i+1)], reg)
reg = regex.compile (reg+'$')
prefix_cache[needle] = reg, len(needle)
return reg, len(needle)
def find_prefix_at_end (haystack, needle):
reg, length = prefix_regex (needle)
lh = len(haystack)
result = reg.search (haystack, max(0,lh-length))
if result >= 0:
return (lh - result)
else:
return 0
reg, length = prefix_regex (needle)
lh = len(haystack)
result = reg.search (haystack, max(0,lh-length))
if result >= 0:
return (lh - result)
else:
return 0

View File

@ -1,12 +1,12 @@
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
@ -15,7 +15,7 @@
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
@ -28,22 +28,22 @@
"""Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
sophisticated high-performance network servers and clients a snap.
"""
import exceptions
@ -191,7 +191,7 @@ class dispatcher:
ar = repr(self.addr)
except:
ar = 'no self.addr!'
return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
def add_channel (self, map=None):
@ -324,7 +324,7 @@ class dispatcher:
# log and log_info maybe overriden to provide more sophisitcated
# logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging.
# and 'log_info' is for informational, warning and error logging.
def log (self, message):
sys.stderr.write ('log: %s\n' % str(message))
@ -433,7 +433,7 @@ def compact_traceback ():
while 1:
tbinfo.append ((
tb.tb_frame.f_code.co_filename,
tb.tb_frame.f_code.co_name,
tb.tb_frame.f_code.co_name,
str(tb.tb_lineno)
))
tb = tb.tb_next

View File

@ -2,7 +2,7 @@
atexit.py - allow programmer to define multiple exit functions to be executed
upon normal program termination.
One public function, register, is defined.
One public function, register, is defined.
"""
_exithandlers = []
@ -12,7 +12,7 @@ def _run_exitfuncs():
_exithandlers is traversed in reverse order so functions are executed
last in, first out.
"""
while _exithandlers:
func, targs, kargs = _exithandlers[-1]
apply(func, targs, kargs)
@ -51,4 +51,3 @@ if __name__ == "__main__":
register(x2, 12)
register(x3, 5, "bar")
register(x3, "no kwd args")

View File

@ -1,254 +1,254 @@
"""Classes for manipulating audio devices (currently only for Sun and SGI)"""
class error(Exception):
pass
pass
class Play_Audio_sgi:
# Private instance variables
## if 0: access frameratelist, nchannelslist, sampwidthlist, oldparams, \
## params, config, inited_outrate, inited_width, \
## inited_nchannels, port, converter, classinited: private
# Private instance variables
## if 0: access frameratelist, nchannelslist, sampwidthlist, oldparams, \
## params, config, inited_outrate, inited_width, \
## inited_nchannels, port, converter, classinited: private
classinited = 0
frameratelist = nchannelslist = sampwidthlist = None
classinited = 0
frameratelist = nchannelslist = sampwidthlist = None
def initclass(self):
import AL
self.frameratelist = [
(48000, AL.RATE_48000),
(44100, AL.RATE_44100),
(32000, AL.RATE_32000),
(22050, AL.RATE_22050),
(16000, AL.RATE_16000),
(11025, AL.RATE_11025),
( 8000, AL.RATE_8000),
]
self.nchannelslist = [
(1, AL.MONO),
(2, AL.STEREO),
(4, AL.QUADRO),
]
self.sampwidthlist = [
(1, AL.SAMPLE_8),
(2, AL.SAMPLE_16),
(3, AL.SAMPLE_24),
]
self.classinited = 1
def initclass(self):
import AL
self.frameratelist = [
(48000, AL.RATE_48000),
(44100, AL.RATE_44100),
(32000, AL.RATE_32000),
(22050, AL.RATE_22050),
(16000, AL.RATE_16000),
(11025, AL.RATE_11025),
( 8000, AL.RATE_8000),
]
self.nchannelslist = [
(1, AL.MONO),
(2, AL.STEREO),
(4, AL.QUADRO),
]
self.sampwidthlist = [
(1, AL.SAMPLE_8),
(2, AL.SAMPLE_16),
(3, AL.SAMPLE_24),
]
self.classinited = 1
def __init__(self):
import al, AL
if not self.classinited:
self.initclass()
self.oldparams = []
self.params = [AL.OUTPUT_RATE, 0]
self.config = al.newconfig()
self.inited_outrate = 0
self.inited_width = 0
self.inited_nchannels = 0
self.converter = None
self.port = None
return
def __init__(self):
import al, AL
if not self.classinited:
self.initclass()
self.oldparams = []
self.params = [AL.OUTPUT_RATE, 0]
self.config = al.newconfig()
self.inited_outrate = 0
self.inited_width = 0
self.inited_nchannels = 0
self.converter = None
self.port = None
return
def __del__(self):
if self.port:
self.stop()
if self.oldparams:
import al, AL
al.setparams(AL.DEFAULT_DEVICE, self.oldparams)
self.oldparams = []
def __del__(self):
if self.port:
self.stop()
if self.oldparams:
import al, AL
al.setparams(AL.DEFAULT_DEVICE, self.oldparams)
self.oldparams = []
def wait(self):
if not self.port:
return
import time
while self.port.getfilled() > 0:
time.sleep(0.1)
self.stop()
def wait(self):
if not self.port:
return
import time
while self.port.getfilled() > 0:
time.sleep(0.1)
self.stop()
def stop(self):
if self.port:
self.port.closeport()
self.port = None
if self.oldparams:
import al, AL
al.setparams(AL.DEFAULT_DEVICE, self.oldparams)
self.oldparams = []
def stop(self):
if self.port:
self.port.closeport()
self.port = None
if self.oldparams:
import al, AL
al.setparams(AL.DEFAULT_DEVICE, self.oldparams)
self.oldparams = []
def setoutrate(self, rate):
for (raw, cooked) in self.frameratelist:
if rate == raw:
self.params[1] = cooked
self.inited_outrate = 1
break
else:
raise error, 'bad output rate'
def setoutrate(self, rate):
for (raw, cooked) in self.frameratelist:
if rate == raw:
self.params[1] = cooked
self.inited_outrate = 1
break
else:
raise error, 'bad output rate'
def setsampwidth(self, width):
for (raw, cooked) in self.sampwidthlist:
if width == raw:
self.config.setwidth(cooked)
self.inited_width = 1
break
else:
if width == 0:
import AL
self.inited_width = 0
self.config.setwidth(AL.SAMPLE_16)
self.converter = self.ulaw2lin
else:
raise error, 'bad sample width'
def setsampwidth(self, width):
for (raw, cooked) in self.sampwidthlist:
if width == raw:
self.config.setwidth(cooked)
self.inited_width = 1
break
else:
if width == 0:
import AL
self.inited_width = 0
self.config.setwidth(AL.SAMPLE_16)
self.converter = self.ulaw2lin
else:
raise error, 'bad sample width'
def setnchannels(self, nchannels):
for (raw, cooked) in self.nchannelslist:
if nchannels == raw:
self.config.setchannels(cooked)
self.inited_nchannels = 1
break
else:
raise error, 'bad # of channels'
def setnchannels(self, nchannels):
for (raw, cooked) in self.nchannelslist:
if nchannels == raw:
self.config.setchannels(cooked)
self.inited_nchannels = 1
break
else:
raise error, 'bad # of channels'
def writeframes(self, data):
if not (self.inited_outrate and self.inited_nchannels):
raise error, 'params not specified'
if not self.port:
import al, AL
self.port = al.openport('Python', 'w', self.config)
self.oldparams = self.params[:]
al.getparams(AL.DEFAULT_DEVICE, self.oldparams)
al.setparams(AL.DEFAULT_DEVICE, self.params)
if self.converter:
data = self.converter(data)
self.port.writesamps(data)
def writeframes(self, data):
if not (self.inited_outrate and self.inited_nchannels):
raise error, 'params not specified'
if not self.port:
import al, AL
self.port = al.openport('Python', 'w', self.config)
self.oldparams = self.params[:]
al.getparams(AL.DEFAULT_DEVICE, self.oldparams)
al.setparams(AL.DEFAULT_DEVICE, self.params)
if self.converter:
data = self.converter(data)
self.port.writesamps(data)
def getfilled(self):
if self.port:
return self.port.getfilled()
else:
return 0
def getfilled(self):
if self.port:
return self.port.getfilled()
else:
return 0
def getfillable(self):
if self.port:
return self.port.getfillable()
else:
return self.config.getqueuesize()
def getfillable(self):
if self.port:
return self.port.getfillable()
else:
return self.config.getqueuesize()
# private methods
## if 0: access *: private
# private methods
## if 0: access *: private
def ulaw2lin(self, data):
import audioop
return audioop.ulaw2lin(data, 2)
def ulaw2lin(self, data):
import audioop
return audioop.ulaw2lin(data, 2)
class Play_Audio_sun:
## if 0: access outrate, sampwidth, nchannels, inited_outrate, inited_width, \
## inited_nchannels, converter: private
## if 0: access outrate, sampwidth, nchannels, inited_outrate, inited_width, \
## inited_nchannels, converter: private
def __init__(self):
self.outrate = 0
self.sampwidth = 0
self.nchannels = 0
self.inited_outrate = 0
self.inited_width = 0
self.inited_nchannels = 0
self.converter = None
self.port = None
return
def __init__(self):
self.outrate = 0
self.sampwidth = 0
self.nchannels = 0
self.inited_outrate = 0
self.inited_width = 0
self.inited_nchannels = 0
self.converter = None
self.port = None
return
def __del__(self):
self.stop()
def __del__(self):
self.stop()
def setoutrate(self, rate):
self.outrate = rate
self.inited_outrate = 1
def setoutrate(self, rate):
self.outrate = rate
self.inited_outrate = 1
def setsampwidth(self, width):
self.sampwidth = width
self.inited_width = 1
def setsampwidth(self, width):
self.sampwidth = width
self.inited_width = 1
def setnchannels(self, nchannels):
self.nchannels = nchannels
self.inited_nchannels = 1
def setnchannels(self, nchannels):
self.nchannels = nchannels
self.inited_nchannels = 1
def writeframes(self, data):
if not (self.inited_outrate and self.inited_width and self.inited_nchannels):
raise error, 'params not specified'
if not self.port:
import sunaudiodev, SUNAUDIODEV
self.port = sunaudiodev.open('w')
info = self.port.getinfo()
info.o_sample_rate = self.outrate
info.o_channels = self.nchannels
if self.sampwidth == 0:
info.o_precision = 8
self.o_encoding = SUNAUDIODEV.ENCODING_ULAW
# XXX Hack, hack -- leave defaults
else:
info.o_precision = 8 * self.sampwidth
info.o_encoding = SUNAUDIODEV.ENCODING_LINEAR
self.port.setinfo(info)
if self.converter:
data = self.converter(data)
self.port.write(data)
def writeframes(self, data):
if not (self.inited_outrate and self.inited_width and self.inited_nchannels):
raise error, 'params not specified'
if not self.port:
import sunaudiodev, SUNAUDIODEV
self.port = sunaudiodev.open('w')
info = self.port.getinfo()
info.o_sample_rate = self.outrate
info.o_channels = self.nchannels
if self.sampwidth == 0:
info.o_precision = 8
self.o_encoding = SUNAUDIODEV.ENCODING_ULAW
# XXX Hack, hack -- leave defaults
else:
info.o_precision = 8 * self.sampwidth
info.o_encoding = SUNAUDIODEV.ENCODING_LINEAR
self.port.setinfo(info)
if self.converter:
data = self.converter(data)
self.port.write(data)
def wait(self):
if not self.port:
return
self.port.drain()
self.stop()
def wait(self):
if not self.port:
return
self.port.drain()
self.stop()
def stop(self):
if self.port:
self.port.flush()
self.port.close()
self.port = None
def stop(self):
if self.port:
self.port.flush()
self.port.close()
self.port = None
def getfilled(self):
if self.port:
return self.port.obufcount()
else:
return 0
def getfilled(self):
if self.port:
return self.port.obufcount()
else:
return 0
def getfillable(self):
return BUFFERSIZE - self.getfilled()
def getfillable(self):
return BUFFERSIZE - self.getfilled()
def AudioDev():
# Dynamically try to import and use a platform specific module.
try:
import al
except ImportError:
try:
import sunaudiodev
return Play_Audio_sun()
except ImportError:
try:
import Audio_mac
except ImportError:
raise error, 'no audio device'
else:
return Audio_mac.Play_Audio_mac()
else:
return Play_Audio_sgi()
# Dynamically try to import and use a platform specific module.
try:
import al
except ImportError:
try:
import sunaudiodev
return Play_Audio_sun()
except ImportError:
try:
import Audio_mac
except ImportError:
raise error, 'no audio device'
else:
return Audio_mac.Play_Audio_mac()
else:
return Play_Audio_sgi()
def test(fn = None):
import sys
if sys.argv[1:]:
fn = sys.argv[1]
else:
fn = 'f:just samples:just.aif'
import aifc
af = aifc.open(fn, 'r')
print fn, af.getparams()
p = AudioDev()
p.setoutrate(af.getframerate())
p.setsampwidth(af.getsampwidth())
p.setnchannels(af.getnchannels())
BUFSIZ = af.getframerate()/af.getsampwidth()/af.getnchannels()
while 1:
data = af.readframes(BUFSIZ)
if not data: break
print len(data)
p.writeframes(data)
p.wait()
import sys
if sys.argv[1:]:
fn = sys.argv[1]
else:
fn = 'f:just samples:just.aif'
import aifc
af = aifc.open(fn, 'r')
print fn, af.getparams()
p = AudioDev()
p.setoutrate(af.getframerate())
p.setsampwidth(af.getsampwidth())
p.setnchannels(af.getnchannels())
BUFSIZ = af.getframerate()/af.getsampwidth()/af.getnchannels()
while 1:
data = af.readframes(BUFSIZ)
if not data: break
print len(data)
p.writeframes(data)
p.wait()
if __name__ == '__main__':
test()
test()