Remove cl usage from aifc and use bytes throughout.

This module needs a test suite!
This commit is contained in:
Georg Brandl 2008-06-07 19:01:03 +00:00
parent d701c8cc7c
commit 2095cfea89
1 changed files with 77 additions and 147 deletions

View File

@ -137,15 +137,15 @@ writeframesraw.
import struct
import builtins
__all__ = ["Error","open","openfp"]
__all__ = ["Error", "open", "openfp"]
class Error(Exception):
pass
_AIFC_version = 0xA2805140 # Version 1 of AIFF-C
_skiplist = 'COMT', 'INST', 'MIDI', 'AESD', \
'APPL', 'NAME', 'AUTH', '(c) ', 'ANNO'
_skiplist = b'COMT', b'INST', b'MIDI', b'AESD', \
b'APPL', b'NAME', b'AUTH', b'(c) ', b'ANNO'
def _read_long(file):
try:
@ -168,7 +168,7 @@ def _read_short(file):
def _read_string(file):
length = ord(file.read(1))
if length == 0:
data = ''
data = b''
else:
data = file.read(length)
if length & 1 == 0:
@ -203,10 +203,10 @@ def _write_long(f, x):
def _write_string(f, s):
if len(s) > 255:
raise ValueError("string exceeds maximum pstring length")
f.write(chr(len(s)))
f.write(struct.pack('b', len(s)))
f.write(s)
if len(s) & 1 == 0:
f.write(chr(0))
f.write(b'\x00')
def _write_float(f, x):
import math
@ -281,17 +281,16 @@ class Aifc_read:
def initfp(self, file):
self._version = 0
self._decomp = None
self._convert = None
self._markers = []
self._soundpos = 0
self._file = Chunk(file)
if self._file.getname() != 'FORM':
if self._file.getname() != b'FORM':
raise Error('file does not start with FORM id')
formdata = self._file.read(4)
if formdata == 'AIFF':
if formdata == b'AIFF':
self._aifc = 0
elif formdata == 'AIFC':
elif formdata == b'AIFC':
self._aifc = 1
else:
raise Error('not an AIFF or AIFF-C file')
@ -303,39 +302,28 @@ class Aifc_read:
except EOFError:
break
chunkname = chunk.getname()
if chunkname == 'COMM':
if chunkname == b'COMM':
self._read_comm_chunk(chunk)
self._comm_chunk_read = 1
elif chunkname == 'SSND':
elif chunkname == b'SSND':
self._ssnd_chunk = chunk
dummy = chunk.read(8)
self._ssnd_seek_needed = 0
elif chunkname == 'FVER':
elif chunkname == b'FVER':
self._version = _read_ulong(chunk)
elif chunkname == 'MARK':
elif chunkname == b'MARK':
self._readmark(chunk)
elif chunkname in _skiplist:
pass
else:
raise Error('unrecognized chunk type '+chunk.chunkname)
raise Error('unrecognized chunk type ' +
chunkname.decode('latin1'))
chunk.skip()
if not self._comm_chunk_read or not self._ssnd_chunk:
raise Error('COMM chunk and/or SSND chunk missing')
if self._aifc and self._decomp:
import cl
params = [cl.ORIGINAL_FORMAT, 0,
cl.BITS_PER_COMPONENT, self._sampwidth * 8,
cl.FRAME_RATE, self._framerate]
if self._nchannels == 1:
params[1] = cl.MONO
elif self._nchannels == 2:
params[1] = cl.STEREO_INTERLEAVED
else:
raise Error('cannot compress more than 2 channels')
self._decomp.SetParams(params)
def __init__(self, f):
if type(f) == type(''):
if isinstance(f, str):
f = builtins.open(f, 'rb')
# else, assume it is an open file object already
self.initfp(f)
@ -351,9 +339,6 @@ class Aifc_read:
self._soundpos = 0
def close(self):
if self._decomp:
self._decomp.CloseDecompressor()
self._decomp = None
self._file = None
def tell(self):
@ -394,7 +379,7 @@ class Aifc_read:
for marker in self._markers:
if id == marker[0]:
return marker
raise Error('marker %r does not exist' % (id,))
raise Error('marker {0!r} does not exist'.format(id))
def setpos(self, pos):
if pos < 0 or pos > self._nframes:
@ -411,23 +396,21 @@ class Aifc_read:
self._ssnd_chunk.seek(pos + 8)
self._ssnd_seek_needed = 0
if nframes == 0:
return ''
return b''
data = self._ssnd_chunk.read(nframes * self._framesize)
if self._convert and data:
data = self._convert(data)
self._soundpos = self._soundpos + len(data) / (self._nchannels * self._sampwidth)
self._soundpos = self._soundpos + len(data) // (self._nchannels
* self._sampwidth)
return data
#
# Internal methods.
#
def _decomp_data(self, data):
import cl
dummy = self._decomp.SetParam(cl.FRAME_BUFFER_SIZE,
len(data) * 2)
return self._decomp.Decompress(len(data) / self._nchannels,
data)
def _alaw2lin(self, data):
import audioop
return audioop.alaw2lin(data, 2)
def _ulaw2lin(self, data):
import audioop
@ -438,14 +421,13 @@ class Aifc_read:
if not hasattr(self, '_adpcmstate'):
# first time
self._adpcmstate = None
data, self._adpcmstate = audioop.adpcm2lin(data, 2,
self._adpcmstate)
data, self._adpcmstate = audioop.adpcm2lin(data, 2, self._adpcmstate)
return data
def _read_comm_chunk(self, chunk):
self._nchannels = _read_short(chunk)
self._nframes = _read_long(chunk)
self._sampwidth = (_read_short(chunk) + 7) / 8
self._sampwidth = (_read_short(chunk) + 7) // 8
self._framerate = int(_read_float(chunk))
self._framesize = self._nchannels * self._sampwidth
if self._aifc:
@ -466,42 +448,21 @@ class Aifc_read:
chunk.file.seek(-1, 1)
#DEBUG end
self._compname = _read_string(chunk)
if self._comptype != 'NONE':
if self._comptype == 'G722':
try:
import audioop
except ImportError:
pass
else:
self._convert = self._adpcm2lin
self._framesize = self._framesize / 4
return
# for ULAW and ALAW try Compression Library
try:
import cl
except ImportError:
if self._comptype == 'ULAW':
try:
import audioop
self._convert = self._ulaw2lin
self._framesize = self._framesize / 2
return
except ImportError:
pass
raise Error('cannot read compressed AIFF-C files')
if self._comptype == 'ULAW':
scheme = cl.G711_ULAW
self._framesize = self._framesize / 2
elif self._comptype == 'ALAW':
scheme = cl.G711_ALAW
self._framesize = self._framesize / 2
if self._comptype != b'NONE':
if self._comptype == b'G722':
self._convert = self._adpcm2lin
self._framesize = self._framesize // 4
elif self._comptype in (b'ulaw', b'ULAW'):
self._convert = self._ulaw2lin
self._framesize = self._framesize // 2
elif self._comptype in (b'alaw', b'ALAW'):
self._convert = self._alaw2lin
self._framesize = self._framesize // 2
else:
raise Error('unsupported compression type')
self._decomp = cl.OpenDecompressor(scheme)
self._convert = self._decomp_data
else:
self._comptype = 'NONE'
self._compname = 'not compressed'
self._comptype = b'NONE'
self._compname = b'not compressed'
def _readmark(self, chunk):
nmarkers = _read_short(chunk)
@ -555,7 +516,7 @@ class Aifc_write:
# _datawritten -- the size of the audio samples actually written
def __init__(self, f):
if type(f) == type(''):
if isinstance(f, str):
filename = f
f = builtins.open(f, 'wb')
else:
@ -570,9 +531,8 @@ class Aifc_write:
def initfp(self, file):
self._file = file
self._version = _AIFC_version
self._comptype = 'NONE'
self._compname = 'not compressed'
self._comp = None
self._comptype = b'NONE'
self._compname = b'not compressed'
self._convert = None
self._nchannels = 0
self._sampwidth = 0
@ -649,7 +609,8 @@ class Aifc_write:
def setcomptype(self, comptype, compname):
if self._nframeswritten:
raise Error('cannot change parameters after starting to write')
if comptype not in ('NONE', 'ULAW', 'ALAW', 'G722'):
if comptype not in (b'NONE', b'ulaw', b'ULAW',
b'alaw', b'ALAW', b'G722'):
raise Error('unsupported compression type')
self._comptype = comptype
self._compname = compname
@ -669,7 +630,8 @@ class Aifc_write:
nchannels, sampwidth, framerate, nframes, comptype, compname = params
if self._nframeswritten:
raise Error('cannot change parameters after starting to write')
if comptype not in ('NONE', 'ULAW', 'ALAW', 'G722'):
if comptype not in (b'NONE', b'ulaw', b'ULAW',
b'alaw', b'ALAW', b'G722'):
raise Error('unsupported compression type')
self.setnchannels(nchannels)
self.setsampwidth(sampwidth)
@ -688,7 +650,7 @@ class Aifc_write:
raise Error('marker ID must be > 0')
if pos < 0:
raise Error('marker position must be >= 0')
if type(name) != type(''):
if not isinstance(name, str):
raise Error('marker name must be a string')
for i in range(len(self._markers)):
if id == self._markers[i][0]:
@ -700,7 +662,7 @@ class Aifc_write:
for marker in self._markers:
if id == marker[0]:
return marker
raise Error('marker %r does not exist' % (id,))
raise Error('marker {0!r} does not exist'.format(id))
def getmarkers(self):
if len(self._markers) == 0:
@ -712,7 +674,7 @@ class Aifc_write:
def writeframesraw(self, data):
self._ensure_header_written(len(data))
nframes = len(data) / (self._sampwidth * self._nchannels)
nframes = len(data) // (self._sampwidth * self._nchannels)
if self._convert:
data = self._convert(data)
self._file.write(data)
@ -729,16 +691,13 @@ class Aifc_write:
self._ensure_header_written(0)
if self._datawritten & 1:
# quick pad to even size
self._file.write(chr(0))
self._file.write(b'\x00')
self._datawritten = self._datawritten + 1
self._writemarkers()
if self._nframeswritten != self._nframes or \
self._datalength != self._datawritten or \
self._marklength:
self._patchheader()
if self._comp:
self._comp.CloseCompressor()
self._comp = None
self._file.flush()
self._file = None
@ -746,11 +705,9 @@ class Aifc_write:
# Internal methods.
#
def _comp_data(self, data):
import cl
dummy = self._comp.SetParam(cl.FRAME_BUFFER_SIZE, len(data))
dummy = self._comp.SetParam(cl.COMPRESSED_BUFFER_SIZE, len(data))
return self._comp.Compress(self._nframes, data)
def _lin2alaw(self, data):
import audioop
return audioop.lin2alaw(data, 2)
def _lin2ulaw(self, data):
import audioop
@ -760,22 +717,23 @@ class Aifc_write:
import audioop
if not hasattr(self, '_adpcmstate'):
self._adpcmstate = None
data, self._adpcmstate = audioop.lin2adpcm(data, 2,
self._adpcmstate)
data, self._adpcmstate = audioop.lin2adpcm(data, 2, self._adpcmstate)
return data
def _ensure_header_written(self, datasize):
if not self._nframeswritten:
if self._comptype in ('ULAW', 'ALAW'):
if self._comptype in (b'ULAW', b'ALAW'):
if not self._sampwidth:
self._sampwidth = 2
if self._sampwidth != 2:
raise Error('sample width must be 2 when compressing with ULAW or ALAW')
if self._comptype == 'G722':
raise Error('sample width must be 2 when compressing '
'with ulaw/ULAW or alaw/ALAW')
if self._comptype == b'G722':
if not self._sampwidth:
self._sampwidth = 2
if self._sampwidth != 2:
raise Error('sample width must be 2 when compressing with G7.22 (ADPCM)')
raise Error('sample width must be 2 when compressing '
'with G7.22 (ADPCM)')
if not self._nchannels:
raise Error('# channels not specified')
if not self._sampwidth:
@ -785,71 +743,43 @@ class Aifc_write:
self._write_header(datasize)
def _init_compression(self):
if self._comptype == 'G722':
if self._comptype == b'G722':
self._convert = self._lin2adpcm
return
try:
import cl
except ImportError:
if self._comptype == 'ULAW':
try:
import audioop
self._convert = self._lin2ulaw
return
except ImportError:
pass
raise Error('cannot write compressed AIFF-C files')
if self._comptype == 'ULAW':
scheme = cl.G711_ULAW
elif self._comptype == 'ALAW':
scheme = cl.G711_ALAW
elif self._comptype in (b'ulaw', b'ULAW'):
self._convert = self._lin2ulaw
elif self._comptype in (b'alaw', b'ALAW'):
self._convert = self._lin2alaw
else:
raise Error('unsupported compression type')
self._comp = cl.OpenCompressor(scheme)
params = [cl.ORIGINAL_FORMAT, 0,
cl.BITS_PER_COMPONENT, self._sampwidth * 8,
cl.FRAME_RATE, self._framerate,
cl.FRAME_BUFFER_SIZE, 100,
cl.COMPRESSED_BUFFER_SIZE, 100]
if self._nchannels == 1:
params[1] = cl.MONO
elif self._nchannels == 2:
params[1] = cl.STEREO_INTERLEAVED
else:
raise Error('cannot compress more than 2 channels')
self._comp.SetParams(params)
# the compressor produces a header which we ignore
dummy = self._comp.Compress(0, '')
self._convert = self._comp_data
def _write_header(self, initlength):
if self._aifc and self._comptype != 'NONE':
if self._aifc and self._comptype != b'NONE':
self._init_compression()
self._file.write('FORM')
self._file.write(b'FORM')
if not self._nframes:
self._nframes = initlength / (self._nchannels * self._sampwidth)
self._nframes = initlength // (self._nchannels * self._sampwidth)
self._datalength = self._nframes * self._nchannels * self._sampwidth
if self._datalength & 1:
self._datalength = self._datalength + 1
if self._aifc:
if self._comptype in ('ULAW', 'ALAW'):
self._datalength = self._datalength / 2
if self._comptype in (b'ulaw', b'ULAW', b'alaw', b'ALAW'):
self._datalength = self._datalength // 2
if self._datalength & 1:
self._datalength = self._datalength + 1
elif self._comptype == 'G722':
self._datalength = (self._datalength + 3) / 4
elif self._comptype == b'G722':
self._datalength = (self._datalength + 3) // 4
if self._datalength & 1:
self._datalength = self._datalength + 1
self._form_length_pos = self._file.tell()
commlength = self._write_form_length(self._datalength)
if self._aifc:
self._file.write('AIFC')
self._file.write('FVER')
self._file.write(b'AIFC')
self._file.write(b'FVER')
_write_long(self._file, 4)
_write_long(self._file, self._version)
else:
self._file.write('AIFF')
self._file.write('COMM')
self._file.write(b'AIFF')
self._file.write(b'COMM')
_write_long(self._file, commlength)
_write_short(self._file, self._nchannels)
self._nframes_pos = self._file.tell()
@ -859,7 +789,7 @@ class Aifc_write:
if self._aifc:
self._file.write(self._comptype)
_write_string(self._file, self._compname)
self._file.write('SSND')
self._file.write(b'SSND')
self._ssnd_length_pos = self._file.tell()
_write_long(self._file, self._datalength + 8)
_write_long(self._file, 0)
@ -882,7 +812,7 @@ class Aifc_write:
curpos = self._file.tell()
if self._datawritten & 1:
datalength = self._datawritten + 1
self._file.write(chr(0))
self._file.write(b'\x00')
else:
datalength = self._datawritten
if datalength == self._datalength and \
@ -903,7 +833,7 @@ class Aifc_write:
def _writemarkers(self):
if len(self._markers) == 0:
return
self._file.write('MARK')
self._file.write(b'MARK')
length = 2
for marker in self._markers:
id, pos, name = marker