SF patch# 1770008 by Christian Heimes (plus some extras).

Completely get rid of StringIO.py and cStringIO.c.

I had to fix a few tests and modules beyond what Christian did, and
invent a few conventions.  E.g. in elementtree, I chose to
write/return Unicode strings whe no encoding is given, but bytes when
an explicit encoding is given.  Also mimetools was made to always
assume binary files.
This commit is contained in:
Guido van Rossum 2007-08-09 01:03:29 +00:00
parent 918f49e645
commit 34d1928766
78 changed files with 312 additions and 657 deletions

View File

@ -19,7 +19,7 @@ isn't part of a group of similar items is not modified.
__version__ = '$Revision$'
import re
import StringIO
import io
import sys
@ -50,7 +50,7 @@ def process(ifn, ofn=None):
ifp = open(ifn)
if ofn is None:
ofn = ifn
ofp = StringIO.StringIO()
ofp = io.StringIO()
entries = []
match = breakable_re.match
write = ofp.write

View File

@ -1055,8 +1055,8 @@ def main():
ofp = sys.stdout
elif len(sys.argv) == 3:
ifp = open(sys.argv[1])
import StringIO
ofp = StringIO.StringIO()
import io
ofp = io.StringIO()
else:
usage()
sys.exit(2)

View File

@ -1,324 +0,0 @@
r"""File-like objects that read from or write to a string buffer.
This implements (nearly) all stdio methods.
f = StringIO() # ready for writing
f = StringIO(buf) # ready for reading
f.close() # explicitly release resources held
flag = f.isatty() # always false
pos = f.tell() # get current position
f.seek(pos) # set current position
f.seek(pos, mode) # mode 0: absolute; 1: relative; 2: relative to EOF
buf = f.read() # read until EOF
buf = f.read(n) # read up to n bytes
buf = f.readline() # read until end of line ('\n') or EOF
list = f.readlines()# list of f.readline() results until EOF
f.truncate([size]) # truncate file at to at most size (default: current pos)
f.write(buf) # write at current position
f.writelines(list) # for line in list: f.write(line)
f.getvalue() # return whole file's contents as a string
Notes:
- Using a real file is often faster (but less convenient).
- There's also a much faster implementation in C, called cStringIO, but
it's not subclassable.
- fileno() is left unimplemented so that code which uses it triggers
an exception early.
- Seeking far beyond EOF and then writing will insert real null
bytes that occupy space in the buffer.
- There's a simple test set (see end of this file).
"""
try:
from errno import EINVAL
except ImportError:
EINVAL = 22
__all__ = ["StringIO"]
def _complain_ifclosed(closed):
if closed:
raise ValueError, "I/O operation on closed file"
class StringIO:
"""class StringIO([buffer])
When a StringIO object is created, it can be initialized to an existing
string by passing the string to the constructor. If no string is given,
the StringIO will start empty.
The StringIO object can accept either Unicode or 8-bit strings, but
mixing the two may take some care. If both are used, 8-bit strings that
cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause
a UnicodeError to be raised when getvalue() is called.
"""
def __init__(self, buf = ''):
# Force self.buf to be a string or unicode
if not isinstance(buf, basestring):
buf = str(buf)
self.buf = buf
self.len = len(buf)
self.buflist = []
self.pos = 0
self.closed = False
def __iter__(self):
return self
def __next__(self):
"""A file object is its own iterator, for example iter(f) returns f
(unless f is closed). When a file is used as an iterator, typically
in a for loop (for example, for line in f: print line), the __next__()
method is called repeatedly. This method returns the next input line,
or raises StopIteration when EOF is hit.
"""
_complain_ifclosed(self.closed)
r = self.readline()
if not r:
raise StopIteration
return r
def close(self):
"""Free the memory buffer.
"""
if not self.closed:
self.closed = True
del self.buf, self.pos
def isatty(self):
"""Returns False because StringIO objects are not connected to a
tty-like device.
"""
_complain_ifclosed(self.closed)
return False
def seek(self, pos, mode = 0):
"""Set the file's current position.
The mode argument is optional and defaults to 0 (absolute file
positioning); other values are 1 (seek relative to the current
position) and 2 (seek relative to the file's end).
There is no return value.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
if mode == 1:
pos += self.pos
elif mode == 2:
pos += self.len
self.pos = max(0, pos)
def tell(self):
"""Return the file's current position."""
_complain_ifclosed(self.closed)
return self.pos
def read(self, n=None):
"""Read at most size bytes from the file
(less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted, read all data until EOF
is reached. The bytes are returned as a string object. An empty
string is returned when EOF is encountered immediately.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
if n is None:
n = -1
if n < 0:
newpos = self.len
else:
newpos = min(self.pos+n, self.len)
r = self.buf[self.pos:newpos]
self.pos = newpos
return r
def readline(self, length=None):
r"""Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent
when a file ends with an incomplete line). If the size argument is
present and non-negative, it is a maximum byte count (including the
trailing newline) and an incomplete line may be returned.
An empty string is returned only when EOF is encountered immediately.
Note: Unlike stdio's fgets(), the returned string contains null
characters ('\0') if they occurred in the input.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
i = self.buf.find('\n', self.pos)
if i < 0:
newpos = self.len
else:
newpos = i+1
if length is not None:
if self.pos + length < newpos:
newpos = self.pos + length
r = self.buf[self.pos:newpos]
self.pos = newpos
return r
def readlines(self, sizehint = 0):
"""Read until EOF using readline() and return a list containing the
lines thus read.
If the optional sizehint argument is present, instead of reading up
to EOF, whole lines totalling approximately sizehint bytes (or more
to accommodate a final whole line).
"""
total = 0
lines = []
line = self.readline()
while line:
lines.append(line)
total += len(line)
if 0 < sizehint <= total:
break
line = self.readline()
return lines
def truncate(self, size=None):
"""Truncate the file's size.
If the optional size argument is present, the file is truncated to
(at most) that size. The size defaults to the current position.
The current file position is not changed unless the position
is beyond the new file size.
If the specified size exceeds the file's current size, the
file remains unchanged.
"""
_complain_ifclosed(self.closed)
if size is None:
size = self.pos
elif size < 0:
raise IOError(EINVAL, "Negative size not allowed")
elif size < self.pos:
self.pos = size
self.buf = self.getvalue()[:size]
self.len = size
def write(self, s):
"""Write a string to the file.
There is no return value.
"""
_complain_ifclosed(self.closed)
if not s: return
# Force s to be a string or unicode
if not isinstance(s, basestring):
s = str(s)
spos = self.pos
slen = self.len
if spos == slen:
self.buflist.append(s)
self.len = self.pos = spos + len(s)
return
if spos > slen:
self.buflist.append('\0'*(spos - slen))
slen = spos
newpos = spos + len(s)
if spos < slen:
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = [self.buf[:spos], s, self.buf[newpos:]]
self.buf = ''
if newpos > slen:
slen = newpos
else:
self.buflist.append(s)
slen = newpos
self.len = slen
self.pos = newpos
def writelines(self, iterable):
"""Write a sequence of strings to the file. The sequence can be any
iterable object producing strings, typically a list of strings. There
is no return value.
(The name is intended to match readlines(); writelines() does not add
line separators.)
"""
write = self.write
for line in iterable:
write(line)
def flush(self):
"""Flush the internal buffer
"""
_complain_ifclosed(self.closed)
def getvalue(self):
"""
Retrieve the entire contents of the "file" at any time before
the StringIO object's close() method is called.
The StringIO object can accept either Unicode or 8-bit strings,
but mixing the two may take some care. If both are used, 8-bit
strings that cannot be interpreted as 7-bit ASCII (that use the
8th bit) will cause a UnicodeError to be raised when getvalue()
is called.
"""
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
return self.buf
# A little test suite
def test():
import sys
if sys.argv[1:]:
file = sys.argv[1]
else:
file = '/etc/passwd'
lines = open(file, 'r').readlines()
text = open(file, 'r').read()
f = StringIO()
for line in lines[:-2]:
f.write(line)
f.writelines(lines[-2:])
if f.getvalue() != text:
raise RuntimeError, 'write failed'
length = f.tell()
print('File length =', length)
f.seek(len(lines[0]))
f.write(lines[1])
f.seek(0)
print('First line =', repr(f.readline()))
print('Position =', f.tell())
line = f.readline()
print('Second line =', repr(line))
f.seek(-len(line), 1)
line2 = f.read(len(line))
if line != line2:
raise RuntimeError, 'bad result after seek back'
f.seek(len(line2), 1)
list = f.readlines()
line = list[-1]
f.seek(f.tell() - len(line))
line2 = f.read()
if line != line2:
raise RuntimeError, 'bad result after seek back from EOF'
print('Read', len(list), 'more lines')
print('File length =', f.tell())
if f.tell() != length:
raise RuntimeError, 'bad length'
f.truncate(length/2)
f.seek(0, 2)
print('Truncated length =', f.tell())
if f.tell() != length/2:
raise RuntimeError, 'truncate did not adjust length'
f.close()
if __name__ == '__main__':
test()

View File

@ -3,7 +3,7 @@ TestCases for python DB Btree key comparison function.
"""
import sys, os, re
from cStringIO import StringIO
from io import StringIO
from . import test_all

View File

@ -58,7 +58,7 @@ def _warn_unhandled_exception():
# catching input that's bad in unexpected ways. Warn if any
# exceptions are caught there.
import warnings, traceback, StringIO
f = StringIO.StringIO()
f = io.StringIO()
traceback.print_exc(None, f)
msg = f.getvalue()
warnings.warn("cookielib bug!\n%s" % msg, stacklevel=2)

View File

@ -37,9 +37,9 @@ class CallbackTracbackTestCase(unittest.TestCase):
def capture_stderr(self, func, *args, **kw):
# helper - call function 'func', and return the captured stderr
import StringIO
import io
old_stderr = sys.stderr
logger = sys.stderr = StringIO.StringIO()
logger = sys.stderr = io.StringIO()
try:
func(*args, **kw)
finally:

View File

@ -8,7 +8,7 @@ Implements the Distutils 'register' command (register with the repository).
__revision__ = "$Id$"
import sys, os, urllib2, getpass, urlparse
import StringIO, ConfigParser
import io, ConfigParser
from distutils.core import Command
from distutils.errors import *
@ -253,7 +253,7 @@ Your selection [default 1]: ''', end=' ')
boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = '\n--' + boundary
end_boundary = sep_boundary + '--'
body = StringIO.StringIO()
body = io.StringIO()
for key, value in data.items():
# handle multiple entries for the same name
if type(value) not in (type([]), type( () )):

View File

@ -14,7 +14,6 @@ import ConfigParser
import httplib
import base64
import urlparse
import cStringIO as StringIO
class upload(Command):
@ -135,7 +134,7 @@ class upload(Command):
boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = '\n--' + boundary
end_boundary = sep_boundary + '--'
body = StringIO.StringIO()
body = io.StringIO()
for key, value in data.items():
# handle multiple entries for the same name
if type(value) != type([]):

View File

@ -2,7 +2,7 @@
import os
import sys
import StringIO
import io
import unittest
from distutils.command.build_py import build_py
@ -69,7 +69,7 @@ class BuildPyTestCase(support.TempdirManager,
open(os.path.join(testdir, "testfile"), "w").close()
os.chdir(sources)
sys.stdout = StringIO.StringIO()
sys.stdout = io.StringIO()
try:
dist = Distribution({"packages": ["pkg"],

View File

@ -4,7 +4,7 @@ import distutils.cmd
import distutils.dist
import os
import shutil
import StringIO
import io
import sys
import tempfile
import unittest
@ -177,7 +177,7 @@ class MetadataTestCase(unittest.TestCase):
"obsoletes": ["my.pkg (splat)"]})
def format_metadata(self, dist):
sio = StringIO.StringIO()
sio = io.StringIO()
dist.metadata.write_pkg_file(sio)
return sio.getvalue()

View File

@ -98,7 +98,7 @@ import __future__
import sys, traceback, inspect, linecache, os, re
import unittest, difflib, pdb, tempfile
import warnings
from StringIO import StringIO
from io import StringIO
# There are 4 basic classes:
# - Example: a <source, want> pair, plus an intra-docstring line number.

View File

@ -12,7 +12,7 @@ import time
import random
import warnings
from cStringIO import StringIO
from io import StringIO
from email.header import Header
UNDERSCORE = '_'

View File

@ -12,7 +12,7 @@ __all__ = [
]
import sys
from cStringIO import StringIO
from io import StringIO

View File

@ -10,7 +10,7 @@ import re
import uu
import binascii
import warnings
from cStringIO import StringIO
from io import StringIO
# Intrapackage imports
import email.charset

View File

@ -8,7 +8,7 @@ __all__ = ['MIMEAudio']
import sndhdr
from cStringIO import StringIO
from io import StringIO
from email import encoders
from email.mime.nonmultipart import MIMENonMultipart

View File

@ -7,7 +7,7 @@
__all__ = ['Parser', 'HeaderParser']
import warnings
from cStringIO import StringIO
from io import StringIO
from email.feedparser import FeedParser
from email.message import Message

View File

@ -9,7 +9,7 @@ import base64
import difflib
import unittest
import warnings
from cStringIO import StringIO
from io import StringIO
import email

View File

@ -9,7 +9,7 @@ import base64
import difflib
import unittest
import warnings
from cStringIO import StringIO
from io import StringIO
import email

View File

@ -9,7 +9,7 @@
import sys
import os
import unittest
from cStringIO import StringIO
from io import StringIO
from types import ListType
from email.test.test_email import TestEmailBase

View File

@ -27,7 +27,7 @@ import random
import socket
import urllib
import warnings
from cStringIO import StringIO
from io import StringIO
from email._parseaddr import quote
from email._parseaddr import AddressList as _AddressList

View File

@ -303,6 +303,8 @@ class IOBase:
Returns False if we don't know.
"""
if self.closed:
raise ValueError("isatty() on closed file")
return False
### Readline[s] and writelines ###
@ -1239,8 +1241,11 @@ class StringIO(TextIOWrapper):
encoding=encoding,
newline=newline)
if initial_value:
if not isinstance(initial_value, basestring):
initial_value = str(initial_value)
self.write(initial_value)
self.seek(0)
def getvalue(self):
self.flush()
return self.buffer.getvalue().decode(self._encoding)

View File

@ -26,7 +26,7 @@ Copyright (C) 2001-2007 Vinay Sajip. All Rights Reserved.
To use, simply 'import logging' and log away!
"""
import sys, os, time, cStringIO, traceback
import sys, os, time, io, traceback
try:
import codecs
@ -396,7 +396,7 @@ class Formatter:
This default implementation just uses
traceback.print_exception()
"""
sio = cStringIO.StringIO()
sio = io.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()

View File

@ -19,7 +19,7 @@ import email
import email.message
import email.generator
import rfc822
import StringIO
import io
try:
if sys.platform == 'os2emx':
# OS/2 EMX fcntl() not adequate
@ -194,7 +194,7 @@ class Mailbox:
# used in strings and by email.Message are translated here.
"""Dump message contents to target file."""
if isinstance(message, email.message.Message):
buffer = StringIO.StringIO()
buffer = io.StringIO()
gen = email.generator.Generator(buffer, mangle_from_, 0)
gen.flatten(message)
buffer.seek(0)
@ -1141,13 +1141,13 @@ class Babyl(_singlefileMailbox):
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
original_headers = io.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or not line:
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
visible_headers = io.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or not line:
@ -1166,7 +1166,7 @@ class Babyl(_singlefileMailbox):
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
original_headers = io.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or not line:
@ -1182,7 +1182,7 @@ class Babyl(_singlefileMailbox):
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
return StringIO.StringIO(self.get_string(key).replace('\n',
return io.StringIO(self.get_string(key).replace('\n',
os.linesep))
def get_labels(self):
@ -1259,7 +1259,7 @@ class Babyl(_singlefileMailbox):
else:
self._file.write('1,,' + os.linesep)
if isinstance(message, email.message.Message):
orig_buffer = StringIO.StringIO()
orig_buffer = io.StringIO()
orig_generator = email.generator.Generator(orig_buffer, False, 0)
orig_generator.flatten(message)
orig_buffer.seek(0)
@ -1270,7 +1270,7 @@ class Babyl(_singlefileMailbox):
break
self._file.write('*** EOOH ***' + os.linesep)
if isinstance(message, BabylMessage):
vis_buffer = StringIO.StringIO()
vis_buffer = io.StringIO()
vis_generator = email.generator.Generator(vis_buffer, False, 0)
vis_generator.flatten(message.get_visible())
while True:

View File

@ -144,6 +144,7 @@ def choose_boundary():
# Subroutines for decoding some common content-transfer-types
# Input and output must be files opened in binary mode
def decode(input, output, encoding):
"""Decode common content-transfer-encodings (base64, quopri, uuencode)."""
@ -157,7 +158,7 @@ def decode(input, output, encoding):
import uu
return uu.decode(input, output)
if encoding in ('7bit', '8bit'):
return output.write(input.read())
return output.write(input.read().decode("Latin-1"))
if encoding in decodetab:
pipethrough(input, decodetab[encoding], output)
else:

View File

@ -1621,13 +1621,6 @@ class OptionParser (OptionContainer):
result.append(self.format_epilog(formatter))
return "".join(result)
# used by test suite
def _get_encoding(self, file):
encoding = getattr(file, "encoding", None)
if not encoding:
encoding = sys.getdefaultencoding()
return encoding
def print_help(self, file=None):
"""print_help(file : file = stdout)
@ -1636,8 +1629,7 @@ class OptionParser (OptionContainer):
"""
if file is None:
file = sys.stdout
encoding = self._get_encoding(file)
file.write(self.format_help().encode(encoding, "replace"))
file.write(self.format_help())
# class OptionParser

View File

@ -1797,8 +1797,8 @@ def genops(pickle):
is None.
If the pickle has a tell() method, pos was the value of pickle.tell()
before reading the current opcode. If the pickle is a string object,
it's wrapped in a StringIO object, and the latter's tell() result is
before reading the current opcode. If the pickle is a bytes object,
it's wrapped in a BytesIO object, and the latter's tell() result is
used. Else (the pickle doesn't have a tell(), and it's not obvious how
to query its current position) pos is None.
"""

View File

@ -17,7 +17,7 @@ from Carbon import AE
from Carbon.AppleEvents import *
import MacOS
import Carbon.File
import StringIO
import io
import aetypes
from aetypes import mkenum, ObjectSpecifier
import os

View File

@ -12,7 +12,7 @@ import os
import string
import sys
import types
import StringIO
import io
import keyword
import macresource
import aetools
@ -266,7 +266,7 @@ def dumpaetelist(aetelist, output):
def decode(data, verbose=None):
"""Decode a resource into a python data structure"""
f = StringIO.StringIO(data)
f = io.StringIO(data)
aete = generic(getaete, f)
aete = simplify(aete)
processed = f.tell()

View File

@ -36,7 +36,7 @@ saferepr()
import sys as _sys
from StringIO import StringIO as _StringIO
from io import StringIO as _StringIO
__all__ = ["pprint","pformat","isreadable","isrecursive","saferepr",
"PrettyPrinter"]

View File

@ -1804,8 +1804,8 @@ running "hh -decompile . PythonNN.chm" in the C:\PythonNN\Doc> directory.
document = re.sub(addrpat, '', re.sub(divpat, '', file.read()))
file.close()
import htmllib, formatter, StringIO
buffer = StringIO.StringIO()
import htmllib, formatter, io
buffer = io.StringIO()
parser = htmllib.HTMLParser(
formatter.AbstractFormatter(formatter.DumbWriter(buffer)))
parser.start_table = parser.do_p
@ -1816,7 +1816,7 @@ running "hh -decompile . PythonNN.chm" in the C:\PythonNN\Doc> directory.
buffer = replace(buffer.getvalue(), '\xa0', ' ', '\n', '\n ')
pager(' ' + buffer.strip() + '\n')
if xrefs:
buffer = StringIO.StringIO()
buffer = io.StringIO()
formatter.DumbWriter(buffer).send_flowing_data(
'Related help topics: ' + ', '.join(xrefs.split()) + '\n')
self.output.write('\n%s\n' % buffer.getvalue())
@ -1900,9 +1900,9 @@ class ModuleScanner:
else:
loader = importer.find_module(modname)
if hasattr(loader,'get_source'):
import StringIO
import io
desc = source_synopsis(
StringIO.StringIO(loader.get_source(modname))
io.StringIO(loader.get_source(modname))
) or ''
if hasattr(loader,'get_filename'):
path = loader.get_filename(modname)

View File

@ -406,7 +406,7 @@ class AbstractPickleTests(unittest.TestCase):
# is a mystery. cPickle also suppresses PUT for objects with a refcount
# of 1.
def dont_test_disassembly(self):
from cStringIO import StringIO
from io import StringIO
from pickletools import dis
for proto, expected in (0, DATA0_DIS), (1, DATA1_DIS):
@ -951,8 +951,8 @@ class AbstractPickleModuleTests(unittest.TestCase):
self.assertEqual(self.module.HIGHEST_PROTOCOL, 2)
def test_callapi(self):
from cStringIO import StringIO
f = StringIO()
from io import BytesIO
f = BytesIO()
# With and without keyword arguments
self.module.dump(123, f, -1)
self.module.dump(123, file=f, protocol=-1)

View File

@ -125,7 +125,7 @@ import getopt
import random
import warnings
import re
import StringIO
import io
import traceback
# I see no other way to suppress these warnings;
@ -537,7 +537,7 @@ def runtest_inner(test, generate, verbose, quiet,
if verbose:
cfp = None
else:
cfp = StringIO.StringIO() # XXX Should use io.StringIO()
cfp = io.StringIO() # XXX Should use io.StringIO()
try:
save_stdout = sys.stdout

View File

@ -2,13 +2,13 @@
import sys
import unittest
import StringIO
import cStringIO
import io
from test import test_support
class TestGenericStringIO:
# use a class variable MODULE to define which module is being tested
# use a class variable CLASS to define which class is being tested
CLASS = None
# Line of data to test as string
_line = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!'
@ -20,7 +20,7 @@ class TestGenericStringIO:
def setUp(self):
self._line = self.constructor(self._line)
self._lines = self.constructor((self._line + '\n') * 5)
self._fp = self.MODULE.StringIO(self._lines)
self._fp = self.CLASS(self._lines)
def test_reads(self):
eq = self.assertEqual
@ -30,7 +30,7 @@ class TestGenericStringIO:
eq(len(self._fp.readlines(60)), 2)
def test_writes(self):
f = self.MODULE.StringIO()
f = self.CLASS()
self.assertRaises(TypeError, f.seek)
f.write(self._line[:6])
f.seek(3)
@ -39,7 +39,7 @@ class TestGenericStringIO:
self.assertEqual(f.getvalue(), 'abcuvwxyz!')
def test_writelines(self):
f = self.MODULE.StringIO()
f = self.CLASS()
f.writelines([self._line[0], self._line[1], self._line[2]])
f.seek(0)
self.assertEqual(f.getvalue(), 'abc')
@ -48,12 +48,12 @@ class TestGenericStringIO:
def errorGen():
yield 'a'
raise KeyboardInterrupt()
f = self.MODULE.StringIO()
f = self.CLASS()
self.assertRaises(KeyboardInterrupt, f.writelines, errorGen())
def test_truncate(self):
eq = self.assertEqual
f = self.MODULE.StringIO()
f = self.CLASS()
f.write(self._lines)
f.seek(10)
f.truncate()
@ -62,22 +62,22 @@ class TestGenericStringIO:
eq(f.getvalue(), 'abcde')
f.write('xyz')
eq(f.getvalue(), 'abcdexyz')
self.assertRaises(IOError, f.truncate, -1)
self.assertRaises(ValueError, f.truncate, -1)
f.close()
self.assertRaises(ValueError, f.write, 'frobnitz')
def test_closed_flag(self):
f = self.MODULE.StringIO()
f = self.CLASS()
self.assertEqual(f.closed, False)
f.close()
self.assertEqual(f.closed, True)
f = self.MODULE.StringIO(self.constructor("abc"))
f = self.CLASS(self.constructor("abc"))
self.assertEqual(f.closed, False)
f.close()
self.assertEqual(f.closed, True)
def test_isatty(self):
f = self.MODULE.StringIO()
f = self.CLASS()
self.assertRaises(TypeError, f.isatty, None)
self.assertEqual(f.isatty(), False)
f.close()
@ -96,10 +96,10 @@ class TestGenericStringIO:
i += 1
eq(i, 5)
self._fp.close()
self.assertRaises(ValueError, next, self._fp)
self.assertRaises(StopIteration, next, self._fp)
class TestStringIO(TestGenericStringIO, unittest.TestCase):
MODULE = StringIO
class TestioStringIO(TestGenericStringIO, unittest.TestCase):
CLASS = io.StringIO
def test_unicode(self):
@ -109,7 +109,7 @@ class TestStringIO(TestGenericStringIO, unittest.TestCase):
# snippets to larger Unicode strings. This is tested by this
# method. Note that cStringIO does not support this extension.
f = self.MODULE.StringIO()
f = self.CLASS()
f.write(self._line[:6])
f.seek(3)
f.write(str(self._line[20:26]))
@ -118,55 +118,10 @@ class TestStringIO(TestGenericStringIO, unittest.TestCase):
self.assertEqual(s, str('abcuvwxyz!'))
self.assertEqual(type(s), str)
class TestcStringIO(TestGenericStringIO, unittest.TestCase):
MODULE = cStringIO
constructor = str8
def test_unicode(self):
if not test_support.have_unicode: return
# The cStringIO module converts Unicode strings to character
# strings when writing them to cStringIO objects.
# Check that this works.
f = self.MODULE.StringIO()
f.write(str(self._line[:5]))
s = f.getvalue()
self.assertEqual(s, 'abcde')
self.assertEqual(type(s), str8)
f = self.MODULE.StringIO(str(self._line[:5]))
s = f.getvalue()
self.assertEqual(s, 'abcde')
self.assertEqual(type(s), str8)
# XXX This no longer fails -- the default encoding is always UTF-8.
##self.assertRaises(UnicodeDecodeError, self.MODULE.StringIO, '\xf4')
class TestBufferStringIO(TestStringIO):
def constructor(self, s):
return buffer(str8(s))
class TestBuffercStringIO(TestcStringIO):
def constructor(self, s):
return buffer(str8(s))
def test_main():
classes = [
TestStringIO,
TestcStringIO,
]
if not sys.platform.startswith('java'):
classes.extend([
TestBufferStringIO,
TestBuffercStringIO
])
test_support.run_unittest(*classes)
test_support.run_unittest(TestioStringIO)
if __name__ == '__main__':
unittest.main()
test_main()

View File

@ -36,7 +36,6 @@ class AllTest(unittest.TestCase):
self.check_all("Queue")
self.check_all("SimpleHTTPServer")
self.check_all("SocketServer")
self.check_all("StringIO")
self.check_all("UserString")
self.check_all("aifc")
self.check_all("base64")

View File

@ -1,6 +1,6 @@
import sys
import unittest
import StringIO
import io
import atexit
from test import test_support
@ -25,7 +25,7 @@ def raise2():
class TestCase(unittest.TestCase):
def setUp(self):
self.stream = StringIO.StringIO()
self.stream = io.StringIO()
sys.stdout = sys.stderr = self.stream
atexit._clear()

View File

@ -36,23 +36,23 @@ class LegacyBase64TestCase(unittest.TestCase):
def test_encode(self):
eq = self.assertEqual
from cStringIO import StringIO
infp = StringIO('abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'0123456789!@#0^&*();:<>,. []{}')
outfp = StringIO()
from io import BytesIO
infp = BytesIO(b'abcdefghijklmnopqrstuvwxyz'
b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
b'0123456789!@#0^&*();:<>,. []{}')
outfp = BytesIO()
base64.encode(infp, outfp)
eq(outfp.getvalue(),
'YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE'
'RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT'
'Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n')
b'YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE'
b'RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT'
b'Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n')
def test_decode(self):
from cStringIO import StringIO
infp = StringIO('d3d3LnB5dGhvbi5vcmc=')
outfp = StringIO()
from io import BytesIO
infp = BytesIO(b'd3d3LnB5dGhvbi5vcmc=')
outfp = BytesIO()
base64.decode(infp, outfp)
self.assertEqual(outfp.getvalue(), 'www.python.org')
self.assertEqual(outfp.getvalue(), b'www.python.org')

View File

@ -5,7 +5,7 @@ from test.test_support import fcmp, TESTFN, unlink, run_unittest, \
run_with_locale
from operator import neg
import sys, warnings, cStringIO, random, UserDict
import sys, warnings, random, UserDict, io
warnings.filterwarnings("ignore", "hex../oct.. of negative int",
FutureWarning, __name__)
warnings.filterwarnings("ignore", "integer argument expected",
@ -1455,11 +1455,11 @@ class BuiltinTest(unittest.TestCase):
self.assertRaises(ValueError, input)
sys.stdout = BitBucket()
sys.stdin = cStringIO.StringIO("NULL\0")
sys.stdin = io.StringIO("NULL\0")
self.assertRaises(TypeError, input, 42, 42)
sys.stdin = cStringIO.StringIO(" 'whitespace'")
sys.stdin = io.StringIO(" 'whitespace'")
self.assertEqual(input(), " 'whitespace'")
sys.stdin = cStringIO.StringIO()
sys.stdin = io.StringIO()
self.assertRaises(EOFError, input)
del sys.stdout

View File

@ -3,7 +3,7 @@ from test import test_support
from test.test_support import TESTFN
import unittest
from cStringIO import StringIO
from io import BytesIO
import os
import subprocess
import sys
@ -98,7 +98,7 @@ class BZ2FileTest(BaseTest):
self.createTempFile()
bz2f = BZ2File(self.filename)
self.assertRaises(TypeError, bz2f.readline, None)
sio = StringIO(self.TEXT)
sio = BytesIO(self.TEXT)
for line in sio.readlines():
self.assertEqual(bz2f.readline(), line)
bz2f.close()
@ -108,7 +108,7 @@ class BZ2FileTest(BaseTest):
self.createTempFile()
bz2f = BZ2File(self.filename)
self.assertRaises(TypeError, bz2f.readlines, None)
sio = StringIO(self.TEXT)
sio = BytesIO(self.TEXT)
self.assertEqual(bz2f.readlines(), sio.readlines())
bz2f.close()
@ -116,7 +116,7 @@ class BZ2FileTest(BaseTest):
# "Test iter(BZ2File)"
self.createTempFile()
bz2f = BZ2File(self.filename)
sio = StringIO(self.TEXT)
sio = BytesIO(self.TEXT)
self.assertEqual(list(iter(bz2f)), sio.readlines())
bz2f.close()
@ -149,7 +149,7 @@ class BZ2FileTest(BaseTest):
# "Test BZ2File.writelines()"
bz2f = BZ2File(self.filename, "w")
self.assertRaises(TypeError, bz2f.writelines)
sio = StringIO(self.TEXT)
sio = BytesIO(self.TEXT)
bz2f.writelines(sio.readlines())
bz2f.close()
# patch #1535500

View File

@ -1,5 +1,5 @@
import ConfigParser
import StringIO
import io
import unittest
import UserDict
@ -30,7 +30,7 @@ class TestCaseBase(unittest.TestCase):
def fromstring(self, string, defaults=None):
cf = self.newconfig(defaults)
sio = StringIO.StringIO(string)
sio = io.StringIO(string)
cf.readfp(sio)
return cf
@ -156,7 +156,7 @@ class TestCaseBase(unittest.TestCase):
"No Section!\n")
def parse_error(self, exc, src):
sio = StringIO.StringIO(src)
sio = io.StringIO(src)
self.assertRaises(exc, self.cf.readfp, sio)
def test_query_errors(self):
@ -222,7 +222,7 @@ class TestCaseBase(unittest.TestCase):
"foo: another very\n"
" long line"
)
output = StringIO.StringIO()
output = io.StringIO()
cf.write(output)
self.assertEqual(
output.getvalue(),
@ -449,7 +449,7 @@ class SortedTestCase(RawConfigParserTestCase):
"o1=4\n"
"[a]\n"
"k=v\n")
output = StringIO.StringIO()
output = io.StringIO()
self.cf.write(output)
self.assertEquals(output.getvalue(),
"[a]\n"

View File

@ -4,7 +4,7 @@ import os
import sys
import tempfile
import unittest
from StringIO import StringIO
from io import StringIO
class HackedSysModule:
# The regression test will have real values in sys.argv, which
@ -15,9 +15,9 @@ class HackedSysModule:
cgi.sys = HackedSysModule()
try:
from cStringIO import StringIO
from io import StringIO
except ImportError:
from StringIO import StringIO
from io import StringIO
class ComparableException:
def __init__(self, err):

View File

@ -6,10 +6,10 @@ import unittest
from test.test_support import run_unittest, is_jython
from codeop import compile_command, PyCF_DONT_IMPLY_DEDENT
import io
if is_jython:
import sys
import cStringIO
def unify_callables(d):
for n,v in d.items():
@ -27,7 +27,7 @@ class CodeopTests(unittest.TestCase):
if symbol == "single":
d,r = {},{}
saved_stdout = sys.stdout
sys.stdout = cStringIO.StringIO()
sys.stdout = io.StringIO()
try:
exec(code, d)
exec(compile(str,"<input>","single"), r)

View File

@ -153,8 +153,8 @@ class HeaderTests(TestCase):
try:
result = split_header_words([arg])
except:
import traceback, StringIO
f = StringIO.StringIO()
import traceback, io
f = io.StringIO()
traceback.print_exc(None, f)
result = "(error -- traceback follows)\n\n%s" % f.getvalue()
self.assertEquals(result, expect, """
@ -204,8 +204,8 @@ class FakeResponse:
"""
headers: list of RFC822-style 'Key: value' strings
"""
import mimetools, StringIO
f = StringIO.StringIO("\n".join(headers))
import mimetools, io
f = io.StringIO("\n".join(headers))
self._headers = mimetools.Message(f)
self._url = url
def info(self): return self._headers

View File

@ -5,7 +5,7 @@
import sys
import os
import unittest
from StringIO import StringIO
from io import StringIO
from tempfile import TemporaryFile
import csv
import gc

View File

@ -4,7 +4,7 @@ from test import test_support, seq_tests
from weakref import proxy
import copy
import pickle
from cStringIO import StringIO
from io import StringIO
import random
import os

View File

@ -1,7 +1,7 @@
import unittest
from test import test_support
import sys, UserDict, cStringIO
import sys, UserDict
class DictTest(unittest.TestCase):

View File

@ -4,7 +4,7 @@ from test.test_support import verbose, run_unittest
import unittest
import sys
import dis
import StringIO
import io
def _f(a):
@ -103,7 +103,7 @@ Disassembly of g:
class DisTests(unittest.TestCase):
def do_disassembly_test(self, func, expected):
s = StringIO.StringIO()
s = io.StringIO()
save_stdout = sys.stdout
sys.stdout = s
dis.dis(func)

View File

@ -2405,7 +2405,7 @@ def test_main():
from test import test_doctest
test_support.run_doctest(test_doctest, verbosity=True)
import trace, sys, re, StringIO
import trace, sys, re, io
def test_coverage(coverdir):
tracer = trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix,],
trace=0, count=1)

View File

@ -7,7 +7,7 @@ import unittest
from test.test_support import verbose, TESTFN, run_unittest
from test.test_support import unlink as safe_unlink
import sys, os, re
from StringIO import StringIO
from io import StringIO
from fileinput import FileInput, hook_encoded
# The fileinput module has 2 interfaces: the FileInput class which does

View File

@ -1685,8 +1685,8 @@ RuntimeError: generator ignored GeneratorExit
Our ill-behaved code should be invoked during GC:
>>> import sys, StringIO
>>> old, sys.stderr = sys.stderr, StringIO.StringIO()
>>> import sys, io
>>> old, sys.stderr = sys.stderr, io.StringIO()
>>> g = f()
>>> next(g)
>>> del g
@ -1796,10 +1796,10 @@ explicitly, without generators. We do have to redirect stderr to avoid
printing warnings and to doublecheck that we actually tested what we wanted
to test.
>>> import sys, StringIO
>>> import sys, io
>>> old = sys.stderr
>>> try:
... sys.stderr = StringIO.StringIO()
... sys.stderr = io.StringIO()
... class Leaker:
... def __del__(self):
... raise RuntimeError

View File

@ -25,7 +25,7 @@ Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
"""
import select
import os, sys, struct, pickle, cStringIO
import os, sys, struct, pickle, io
import socket, tempfile, threading, time
import logging, logging.handlers, logging.config
from test.test_support import run_with_locale
@ -606,7 +606,7 @@ def test_main_inner():
#Configure the logger for logrecv so events do not propagate beyond it.
#The sockLogger output is buffered in memory until the end of the test,
#and printed at the end.
sockOut = cStringIO.StringIO()
sockOut = io.StringIO()
sockLogger = logging.getLogger("logrecv")
sockLogger.setLevel(logging.DEBUG)
sockhdlr = logging.StreamHandler(sockOut)

View File

@ -7,7 +7,7 @@ import email
import email.message
import rfc822
import re
import StringIO
import io
from test import test_support
import unittest
import mailbox
@ -69,7 +69,7 @@ class TestMailbox(TestBase):
self.assertEqual(len(self._box), 2)
keys.append(self._box.add(email.message_from_string(_sample_message)))
self.assertEqual(len(self._box), 3)
keys.append(self._box.add(StringIO.StringIO(_sample_message)))
keys.append(self._box.add(io.StringIO(_sample_message)))
self.assertEqual(len(self._box), 4)
keys.append(self._box.add(_sample_message))
self.assertEqual(len(self._box), 5)
@ -400,12 +400,12 @@ class TestMailbox(TestBase):
def test_dump_message(self):
# Write message representations to disk
for input in (email.message_from_string(_sample_message),
_sample_message, StringIO.StringIO(_sample_message)):
output = StringIO.StringIO()
_sample_message, io.StringIO(_sample_message)):
output = io.StringIO()
self._box._dump_message(input, output)
self.assert_(output.getvalue() ==
_sample_message.replace('\n', os.linesep))
output = StringIO.StringIO()
output = io.StringIO()
self.assertRaises(TypeError,
lambda: self._box._dump_message(None, output))

View File

@ -8,7 +8,8 @@
import unittest
from test.test_support import run_unittest, TESTFN, TestSkipped
import os, StringIO
import os
import io
import sys
import mhlib
@ -262,7 +263,7 @@ class MhlibTests(unittest.TestCase):
f = mh.openfolder('dummy1')
def create(n):
msg = "From: foo\nSubject: %s\n\nDummy Message %s\n" % (n,n)
f.createmessage(n, StringIO.StringIO(msg))
f.createmessage(n, io.StringIO(msg))
create(7)
create(8)

View File

@ -1,28 +1,54 @@
import unittest
from test import test_support
import string, StringIO, mimetools
import string, mimetools
import io
msgtext1 = mimetools.Message(StringIO.StringIO(
msgtext1 = mimetools.Message(io.StringIO(
"""Content-Type: text/plain; charset=iso-8859-1; format=flowed
Content-Transfer-Encoding: 8bit
Foo!
"""))
sample = bytes(string.ascii_letters + "=" + string.digits + "\n", "ASCII")
class MimeToolsTest(unittest.TestCase):
def test_decodeencode(self):
start = string.ascii_letters + "=" + string.digits + "\n"
for enc in ['7bit','8bit','base64','quoted-printable',
'uuencode', 'x-uuencode', 'uue', 'x-uue']:
i = StringIO.StringIO(start)
o = StringIO.StringIO()
mimetools.encode(i, o, enc)
i = StringIO.StringIO(o.getvalue())
o = StringIO.StringIO()
mimetools.decode(i, o, enc)
self.assertEqual(o.getvalue(), start)
def decode_encode_test(self, enc):
i = io.BytesIO(sample)
o = io.BytesIO()
mimetools.encode(i, o, enc)
i = io.BytesIO(o.getvalue())
o = io.BytesIO()
mimetools.decode(i, o, enc)
self.assertEqual(o.getvalue(), sample)
# Separate tests for better diagnostics
def test_7bit(self):
self.decode_encode_test('7bit')
def test_8bit(self):
self.decode_encode_test('8bit')
def test_base64(self):
self.decode_encode_test('base64')
def test_quoted_printable(self):
self.decode_encode_test('quoted-printable')
def test_uuencode(self):
self.decode_encode_test('uuencode')
def test_x_uuencode(self):
self.decode_encode_test('x-uuencode')
def test_uue(self):
self.decode_encode_test('uue')
def test_x_uue(self):
self.decode_encode_test('x-uue')
def test_boundary(self):
s = set([""])
@ -32,7 +58,7 @@ class MimeToolsTest(unittest.TestCase):
s.add(nb)
def test_message(self):
msg = mimetools.Message(StringIO.StringIO(msgtext1))
msg = mimetools.Message(io.StringIO(msgtext1))
self.assertEqual(msg.gettype(), "text/plain")
self.assertEqual(msg.getmaintype(), "text")
self.assertEqual(msg.getsubtype(), "plain")

View File

@ -1,5 +1,5 @@
import mimetypes
import StringIO
import io
import unittest
from test import test_support
@ -30,7 +30,7 @@ class MimeTypesTestCase(unittest.TestCase):
def test_file_parsing(self):
eq = self.assertEqual
sio = StringIO.StringIO("x-application/x-unittest pyunit\n")
sio = io.StringIO("x-application/x-unittest pyunit\n")
self.db.readfp(sio)
eq(self.db.guess_type("foo.pyunit"),
("x-application/x-unittest", None))

View File

@ -4,7 +4,7 @@ import os
import sys
import pickle
import traceback
from StringIO import StringIO
from io import StringIO
from test.test_support import verbose, run_unittest, TestSkipped
import unittest

View File

@ -1,6 +1,6 @@
import mimetools
import multifile
import cStringIO
import io
msg = """Mime-Version: 1.0
Content-Type: multipart/mixed;
@ -57,7 +57,7 @@ def test_main():
global boundaries, linecount
boundaries = 0
linecount = 0
f = cStringIO.StringIO(msg)
f = io.StringIO(msg)
getMIMEMsg(multifile.MultiFile(f))
assert boundaries == 2
assert linecount == 9

View File

@ -14,7 +14,7 @@ import re
import copy
import unittest
from StringIO import StringIO
from io import StringIO
from pprint import pprint
from test import test_support
@ -157,12 +157,9 @@ and kwargs %(kwargs)r
expected_error=None):
"""Assert the parser prints the expected output on stdout."""
save_stdout = sys.stdout
encoding = getattr(save_stdout, 'encoding', None)
try:
try:
sys.stdout = StringIO()
if encoding:
sys.stdout.encoding = encoding
self.parser.parse_args(cmdline_args)
finally:
output = sys.stdout.getvalue()

View File

@ -1,6 +1,6 @@
import dis
import sys
from cStringIO import StringIO
from io import StringIO
import unittest
def disassemble(func):

View File

@ -4,9 +4,9 @@ import unittest
from test import test_support
try:
from cStringIO import StringIO
from io import StringIO
except ImportError:
from StringIO import StringIO
from io import StringIO
class MessageTestCase(unittest.TestCase):

View File

@ -1,4 +1,5 @@
import unittest, StringIO, robotparser
import unittest, robotparser
import io
from test import test_support
class RobotTestCase(unittest.TestCase):
@ -32,7 +33,7 @@ tests = unittest.TestSuite()
def RobotTest(index, robots_txt, good_urls, bad_urls,
agent="test_robotparser"):
lines = StringIO.StringIO(robots_txt).readlines()
lines = io.StringIO(robots_txt).readlines()
parser = robotparser.RobotFileParser()
parser.parse(lines)
for url in good_urls:

View File

@ -12,7 +12,7 @@ from xml.sax.saxutils import XMLGenerator, escape, unescape, quoteattr, \
XMLFilterBase
from xml.sax.expatreader import create_parser
from xml.sax.xmlreader import InputSource, AttributesImpl, AttributesNSImpl
from cStringIO import StringIO
from io import StringIO
from test.test_support import findfile, run_unittest
import unittest
import os

View File

@ -1,13 +1,13 @@
# -*- coding: iso-8859-1 -*-
import unittest, test.test_support
import sys, cStringIO
import sys, io
class SysModuleTest(unittest.TestCase):
def test_original_displayhook(self):
import __builtin__
savestdout = sys.stdout
out = cStringIO.StringIO()
out = io.StringIO()
sys.stdout = out
dh = sys.__displayhook__
@ -46,7 +46,7 @@ class SysModuleTest(unittest.TestCase):
def test_original_excepthook(self):
savestderr = sys.stderr
err = cStringIO.StringIO()
err = io.StringIO()
sys.stderr = err
eh = sys.__excepthook__

View File

@ -3,7 +3,7 @@ import os
import io
import shutil
import tempfile
import StringIO
import io
from hashlib import md5
import errno
@ -897,15 +897,15 @@ class AppendTest(unittest.TestCase):
self._test()
def test_empty_fileobj(self):
fobj = StringIO.StringIO()
fobj = io.BytesIO()
self._add_testfile(fobj)
fobj.seek(0)
self._test(fileobj=fobj)
def test_fileobj(self):
self._create_testtar()
data = open(self.tarname).read()
fobj = StringIO.StringIO(data)
data = open(self.tarname, "rb").read()
fobj = io.BytesIO(data)
self._add_testfile(fobj)
fobj.seek(0)
self._test(names=["foo", "bar"], fileobj=fobj)

View File

@ -22,7 +22,7 @@ import tempfile
from test.test_support import threading_setup, threading_cleanup, run_unittest
import unittest
import StringIO
import io
from traceback import print_exc
startEvent = threading.Event()
@ -32,7 +32,7 @@ class TempFileGreedy(threading.Thread):
ok_count = 0
def run(self):
self.errors = StringIO.StringIO()
self.errors = io.StringIO()
startEvent.wait()
for i in range(FILES_PER_THREAD):
try:

View File

@ -2,7 +2,7 @@ import unittest
from test import test_support
import os, socket
import StringIO
import io
import urllib2
from urllib2 import Request, OpenerDirector
@ -236,9 +236,9 @@ class MockHeaders(dict):
def getheaders(self, name):
return list(self.values())
class MockResponse(StringIO.StringIO):
class MockResponse(io.StringIO):
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
io.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
def info(self):
return self.headers
@ -353,7 +353,7 @@ class MockHTTPHandler(urllib2.BaseHandler):
self.requests = []
def http_open(self, req):
import mimetools, httplib, copy
from StringIO import StringIO
from io import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
@ -546,7 +546,7 @@ class HandlerTests(unittest.TestCase):
def __init__(self, data): self.data = data
def retrfile(self, filename, filetype):
self.filename, self.filetype = filename, filetype
return StringIO.StringIO(self.data), len(self.data)
return io.StringIO(self.data), len(self.data)
class NullFTPHandler(urllib2.FTPHandler):
def __init__(self, data): self.data = data

View File

@ -6,47 +6,50 @@ Nick Mathewson
import unittest
from test import test_support
import sys, os, uu, cStringIO
import sys, os
import uu
from StringIO import StringIO
from io import BytesIO
import io
plaintext = "The smooth-scaled python crept over the sleeping dog\n"
plaintext = b"The smooth-scaled python crept over the sleeping dog\n"
encodedtext = """\
encodedtext = b"""\
M5&AE('-M;V]T:\"US8V%L960@<'ET:&]N(&-R97!T(&]V97(@=&AE('-L965P
(:6YG(&1O9PH """
encodedtextwrapped = "begin %03o %s\n" + encodedtext.replace("%", "%%") + "\n \nend\n"
def encodedtextwrapped(mode, filename):
return (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
encodedtext + b"\n \nend\n")
class UUTest(unittest.TestCase):
def test_encode(self):
inp = cStringIO.StringIO(plaintext)
out = cStringIO.StringIO()
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1")
self.assertEqual(out.getvalue(), encodedtextwrapped % (0o666, "t1"))
inp = cStringIO.StringIO(plaintext)
out = cStringIO.StringIO()
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1"))
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1", 0o644)
self.assertEqual(out.getvalue(), encodedtextwrapped % (0o644, "t1"))
self.assertEqual(out.getvalue(), encodedtextwrapped(0o644, "t1"))
def test_decode(self):
inp = cStringIO.StringIO(encodedtextwrapped % (0o666, "t1"))
out = cStringIO.StringIO()
inp = io.BytesIO(encodedtextwrapped(0o666, "t1"))
out = io.BytesIO()
uu.decode(inp, out)
self.assertEqual(out.getvalue(), plaintext)
inp = cStringIO.StringIO(
"UUencoded files may contain many lines,\n" +
"even some that have 'begin' in them.\n" +
encodedtextwrapped % (0o666, "t1")
inp = io.BytesIO(
b"UUencoded files may contain many lines,\n" +
b"even some that have 'begin' in them.\n" +
encodedtextwrapped(0o666, "t1")
)
out = cStringIO.StringIO()
out = io.BytesIO()
uu.decode(inp, out)
self.assertEqual(out.getvalue(), plaintext)
def test_truncatedinput(self):
inp = cStringIO.StringIO("begin 644 t1\n" + encodedtext)
out = cStringIO.StringIO()
inp = io.BytesIO(b"begin 644 t1\n" + encodedtext)
out = io.BytesIO()
try:
uu.decode(inp, out)
self.fail("No exception thrown")
@ -54,8 +57,8 @@ class UUTest(unittest.TestCase):
self.assertEqual(str(e), "Truncated input file")
def test_missingbegin(self):
inp = cStringIO.StringIO("")
out = cStringIO.StringIO()
inp = io.BytesIO(b"")
out = io.BytesIO()
try:
uu.decode(inp, out)
self.fail("No exception thrown")
@ -73,24 +76,27 @@ class UUStdIOTest(unittest.TestCase):
sys.stdout = self.stdout
def test_encode(self):
sys.stdin = cStringIO.StringIO(plaintext)
sys.stdout = cStringIO.StringIO()
sys.stdin = io.StringIO(plaintext.decode("ascii"))
sys.stdout = io.StringIO()
uu.encode("-", "-", "t1", 0o666)
self.assertEqual(
sys.stdout.getvalue(),
encodedtextwrapped % (0o666, "t1")
)
self.assertEqual(sys.stdout.getvalue(),
encodedtextwrapped(0o666, "t1").decode("ascii"))
def test_decode(self):
sys.stdin = cStringIO.StringIO(encodedtextwrapped % (0o666, "t1"))
sys.stdout = cStringIO.StringIO()
sys.stdin = io.StringIO(encodedtextwrapped(0o666, "t1").decode("ascii"))
sys.stdout = io.StringIO()
uu.decode("-", "-")
self.assertEqual(sys.stdout.getvalue(), plaintext)
stdout = sys.stdout
sys.stdout = self.stdout
sys.stdin = self.stdin
self.assertEqual(stdout.getvalue(), plaintext.decode("ascii"))
class UUFileTest(unittest.TestCase):
def _kill(self, f):
# close and remove file
if f is None:
return
try:
f.close()
except (SystemExit, KeyboardInterrupt):
@ -113,44 +119,46 @@ class UUFileTest(unittest.TestCase):
del self.tmpout
def test_encode(self):
fin = fout = None
try:
fin = open(self.tmpin, 'wb')
fin.write(plaintext)
fin.close()
fin = open(self.tmpin, 'rb')
fout = open(self.tmpout, 'w')
fout = open(self.tmpout, 'wb')
uu.encode(fin, fout, self.tmpin, mode=0o644)
fin.close()
fout.close()
fout = open(self.tmpout, 'r')
fout = open(self.tmpout, 'rb')
s = fout.read()
fout.close()
self.assertEqual(s, encodedtextwrapped % (0o644, self.tmpin))
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
# in_file and out_file as filenames
uu.encode(self.tmpin, self.tmpout, self.tmpin, mode=0o644)
fout = open(self.tmpout, 'r')
fout = open(self.tmpout, 'rb')
s = fout.read()
fout.close()
self.assertEqual(s, encodedtextwrapped % (0o644, self.tmpin))
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
finally:
self._kill(fin)
self._kill(fout)
def test_decode(self):
f = None
try:
f = open(self.tmpin, 'w')
f.write(encodedtextwrapped % (0o644, self.tmpout))
f = open(self.tmpin, 'wb')
f.write(encodedtextwrapped(0o644, self.tmpout))
f.close()
f = open(self.tmpin, 'r')
f = open(self.tmpin, 'rb')
uu.decode(f)
f.close()
f = open(self.tmpout, 'r')
f = open(self.tmpout, 'rb')
s = f.read()
f.close()
self.assertEqual(s, plaintext)
@ -160,21 +168,25 @@ class UUFileTest(unittest.TestCase):
def test_decodetwice(self):
# Verify that decode() will refuse to overwrite an existing file
f = None
try:
f = cStringIO.StringIO(encodedtextwrapped % (0o644, self.tmpout))
f = io.BytesIO(encodedtextwrapped(0o644, self.tmpout))
f = open(self.tmpin, 'r')
f = open(self.tmpin, 'rb')
uu.decode(f)
f.close()
f = open(self.tmpin, 'r')
f = open(self.tmpin, 'rb')
self.assertRaises(uu.Error, uu.decode, f)
f.close()
finally:
self._kill(f)
def test_main():
test_support.run_unittest(UUTest, UUStdIOTest, UUFileTest)
test_support.run_unittest(UUTest,
UUStdIOTest,
UUFileTest,
)
if __name__=="__main__":
test_main()

View File

@ -39,14 +39,11 @@ def check_method(method):
if not hasattr(method, '__call__'):
print(method, "not callable")
def serialize(ET, elem, encoding=None):
import StringIO
file = StringIO.StringIO()
def serialize(ET, elem):
import io
tree = ET.ElementTree(elem)
if encoding:
tree.write(file, encoding)
else:
tree.write(file)
file = io.StringIO()
tree.write(file)
return file.getvalue()
def summarize(elem):

View File

@ -37,14 +37,11 @@ def check_method(method):
if not hasattr(method, '__call__'):
print(method, "not callable")
def serialize(ET, elem, encoding=None):
import StringIO
file = StringIO.StringIO()
def serialize(ET, elem):
import io
file = io.StringIO()
tree = ET.ElementTree(elem)
if encoding:
tree.write(file, encoding)
else:
tree.write(file)
tree.write(file)
return file.getvalue()
def summarize(elem):

View File

@ -20,7 +20,7 @@ import zipfile, os, unittest
import time
import sys
from StringIO import StringIO
from io import StringIO
from tempfile import TemporaryFile
from test.test_support import TESTFN, run_unittest

View File

@ -15,7 +15,7 @@ import zipimport
import linecache
import doctest
import inspect
import StringIO
import io
from traceback import extract_tb, extract_stack, print_tb
raise_src = 'def do_raise(): raise TypeError\n'
@ -314,7 +314,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
self.assertEqual(line, raise_src.strip())
s = StringIO.StringIO()
s = io.StringIO()
print_tb(tb, 1, s)
self.failUnless(s.getvalue().endswith(raise_src))
else:

View File

@ -45,7 +45,7 @@ def encode(in_file, out_file, name=None, mode=None):
# If in_file is a pathname open it and change defaults
#
if in_file == '-':
in_file = sys.stdin
in_file = sys.stdin.buffer
elif isinstance(in_file, basestring):
if name is None:
name = os.path.basename(in_file)
@ -59,9 +59,9 @@ def encode(in_file, out_file, name=None, mode=None):
# Open out_file if it is a pathname
#
if out_file == '-':
out_file = sys.stdout
out_file = sys.stdout.buffer
elif isinstance(out_file, basestring):
out_file = open(out_file, 'w')
out_file = open(out_file, 'wb')
#
# Set defaults for name and mode
#
@ -86,9 +86,9 @@ def decode(in_file, out_file=None, mode=None, quiet=0):
# Open the input file, if needed.
#
if in_file == '-':
in_file = sys.stdin
in_file = sys.stdin.buffer
elif isinstance(in_file, basestring):
in_file = open(in_file)
in_file = open(in_file, 'rb')
#
# Read until a begin is encountered or we've exhausted the file
#
@ -96,17 +96,18 @@ def decode(in_file, out_file=None, mode=None, quiet=0):
hdr = in_file.readline()
if not hdr:
raise Error('No valid begin line found in input file')
if not hdr.startswith('begin'):
if not hdr.startswith(b'begin'):
continue
hdrfields = hdr.split(' ', 2)
if len(hdrfields) == 3 and hdrfields[0] == 'begin':
hdrfields = hdr.split(b' ', 2)
if len(hdrfields) == 3 and hdrfields[0] == b'begin':
try:
int(hdrfields[1], 8)
break
except ValueError:
pass
if out_file is None:
out_file = hdrfields[2].rstrip()
# If the filename isn't ASCII, what's up with that?!?
out_file = hdrfields[2].rstrip(b' \t\r\n\f').decode("ascii")
if os.path.exists(out_file):
raise Error('Cannot overwrite existing file: %s' % out_file)
if mode is None:
@ -116,7 +117,7 @@ def decode(in_file, out_file=None, mode=None, quiet=0):
#
opened = False
if out_file == '-':
out_file = sys.stdout
out_file = sys.stdout.buffer
elif isinstance(out_file, basestring):
fp = open(out_file, 'wb')
try:
@ -129,12 +130,12 @@ def decode(in_file, out_file=None, mode=None, quiet=0):
# Main decoding loop
#
s = in_file.readline()
while s and s.strip() != 'end':
while s and s.strip(b' \t\r\n\f') != b'end':
try:
data = binascii.a2b_uu(s)
except binascii.Error as v:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((ord(s[0])-32) & 63) * 4 + 5) // 3
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
data = binascii.a2b_uu(s[:nbytes])
if not quiet:
sys.stderr.write("Warning: %s\n" % v)
@ -158,8 +159,9 @@ def test():
parser.error('incorrect number of arguments')
sys.exit(1)
input = sys.stdin
output = sys.stdout
# Use the binary streams underlying stdin/stdout
input = sys.stdin.buffer
output = sys.stdout.buffer
if len(args) > 0:
input = args[0]
if len(args) > 1:
@ -168,7 +170,7 @@ def test():
if options.decode:
if options.text:
if isinstance(output, basestring):
output = open(output, 'w')
output = open(output, 'wb')
else:
print(sys.argv[0], ': cannot do -t to stdout')
sys.exit(1)
@ -176,7 +178,7 @@ def test():
else:
if options.text:
if isinstance(input, basestring):
input = open(input, 'r')
input = open(input, 'rb')
else:
print(sys.argv[0], ': cannot do -t from stdin')
sys.exit(1)

View File

@ -335,9 +335,9 @@ def parse(stream_or_string, parser=None, bufsize=None):
def parseString(string, parser=None):
try:
from cStringIO import StringIO
from io import StringIO
except ImportError:
from StringIO import StringIO
from io import StringIO
bufsize = len(string)
buf = StringIO(string)

View File

@ -625,15 +625,16 @@ class ElementTree:
# Writes the element tree to a file, as XML.
#
# @param file A file name, or a file object opened for writing.
# @param encoding Optional output encoding (default is US-ASCII).
# @param encoding Optional output encoding (default is None)
def write(self, file, encoding="us-ascii"):
def write(self, file, encoding=None):
assert self._root is not None
if not hasattr(file, "write"):
file = open(file, "wb")
if not encoding:
encoding = "us-ascii"
elif encoding != "utf-8" and encoding != "us-ascii":
if encoding:
file = open(file, "wb")
else:
file = open(file, "w")
if encoding and encoding != "utf-8":
file.write(_encode("<?xml version='1.0' encoding='%s'?>\n" % encoding, encoding))
self._write(file, self._root, encoding, {})
@ -720,10 +721,10 @@ def dump(elem):
sys.stdout.write("\n")
def _encode(s, encoding):
try:
if encoding:
return s.encode(encoding)
except AttributeError:
return s # 1.5.2: assume the string uses the right encoding
else:
return s
_escape = re.compile(r"[&<>\"\u0080-\uffff]+")
@ -954,10 +955,11 @@ fromstring = XML
##
# Generates a string representation of an XML element, including all
# subelements.
# subelements. If encoding is None, the return type is a string;
# otherwise it is a bytes array.
#
# @param element An Element instance.
# @return An encoded string containing the XML data.
# @return An (optionally) encoded string containing the XML data.
# @defreturn string
def tostring(element, encoding=None):
@ -967,7 +969,10 @@ def tostring(element, encoding=None):
file = dummy()
file.write = data.append
ElementTree(element).write(file, encoding)
return b"".join(data)
if encoding:
return b"".join(data)
else:
return "".join(data)
##
# Generic element structure builder. This builder converts a sequence

View File

@ -33,10 +33,7 @@ def parse(source, handler, errorHandler=ErrorHandler()):
parser.parse(source)
def parseString(string, handler, errorHandler=ErrorHandler()):
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from io import BytesIO
if errorHandler is None:
errorHandler = ErrorHandler()
@ -45,7 +42,7 @@ def parseString(string, handler, errorHandler=ErrorHandler()):
parser.setErrorHandler(errorHandler)
inpsrc = InputSource()
inpsrc.setByteStream(StringIO(string))
inpsrc.setByteStream(BytesIO(string))
parser.parse(inpsrc)
# this is the parser list used by the make_parser function if no

View File

@ -230,10 +230,6 @@ class PyBuildExt(build_ext):
# Fred Drake's interface to the Python parser
exts.append( Extension('parser', ['parsermodule.c']) )
# cStringIO and cPickle
exts.append( Extension('cStringIO', ['cStringIO.c']) )
exts.append( Extension('cPickle', ['cPickle.c']) )
# Memory-mapped files (also works on Win32).
exts.append( Extension('mmap', ['mmapmodule.c']) )

View File

@ -14,7 +14,7 @@ class cStringIO(Module):
Usage:
from cStringIO import StringIO
from io import StringIO
an_output_stream = StringIO()
an_output_stream.write(some_stuff)

View File

@ -109,7 +109,7 @@ __version__ = "$Revision$"
import sys
import os
from types import *
import StringIO
import io
import getopt
import pickle
@ -721,12 +721,12 @@ class Page:
return infos
class MyStringIO(StringIO.StringIO):
class MyStringIO(io.StringIO):
def __init__(self, url, info):
self.__url = url
self.__info = info
StringIO.StringIO.__init__(self)
super(MyStringIO, self).__init__(self)
def info(self):
return self.__info

View File

@ -460,9 +460,6 @@ class PyBuildExt(build_ext):
# Fred Drake's interface to the Python parser
exts.append( Extension('parser', ['parsermodule.c']) )
# cStringIO
exts.append( Extension('cStringIO', ['cStringIO.c']) )
# Memory-mapped files (also works on Win32).
if platform not in ['atheos', 'mac']:
exts.append( Extension('mmap', ['mmapmodule.c']) )