Use sequence repetition instead of bytes constructor with integer argument.
This commit is contained in:
parent
ab8740058a
commit
5f1a5187f7
|
@ -155,7 +155,7 @@ def b32encode(s):
|
|||
leftover = len(s) % 5
|
||||
# Pad the last quantum with zero bits if necessary
|
||||
if leftover:
|
||||
s = s + bytes(5 - leftover) # Don't use += !
|
||||
s = s + b'\0' * (5 - leftover) # Don't use += !
|
||||
encoded = bytearray()
|
||||
from_bytes = int.from_bytes
|
||||
b32tab2 = _b32tab2
|
||||
|
|
|
@ -357,10 +357,10 @@ class GzipFile(_compression.BaseStream):
|
|||
if offset < self.offset:
|
||||
raise OSError('Negative seek in write mode')
|
||||
count = offset - self.offset
|
||||
chunk = bytes(1024)
|
||||
chunk = b'\0' * 1024
|
||||
for i in range(count // 1024):
|
||||
self.write(chunk)
|
||||
self.write(bytes(count % 1024))
|
||||
self.write(b'\0' * (count % 1024))
|
||||
elif self.mode == READ:
|
||||
self._check_not_closed()
|
||||
return self._buffer.seek(offset, whence)
|
||||
|
|
|
@ -77,7 +77,7 @@ class HMAC:
|
|||
if len(key) > blocksize:
|
||||
key = self.digest_cons(key).digest()
|
||||
|
||||
key = key + bytes(blocksize - len(key))
|
||||
key = key.ljust(blocksize, b'\0')
|
||||
self.outer.update(key.translate(trans_5C))
|
||||
self.inner.update(key.translate(trans_36))
|
||||
if msg is not None:
|
||||
|
|
|
@ -707,7 +707,7 @@ def read_unicodestring8(f):
|
|||
>>> enc = s.encode('utf-8')
|
||||
>>> enc
|
||||
b'abcd\xea\xaf\x8d'
|
||||
>>> n = bytes([len(enc)]) + bytes(7) # little-endian 8-byte length
|
||||
>>> n = bytes([len(enc)]) + b'\0' * 7 # little-endian 8-byte length
|
||||
>>> t = read_unicodestring8(io.BytesIO(n + enc + b'junk'))
|
||||
>>> s == t
|
||||
True
|
||||
|
|
|
@ -59,7 +59,7 @@ class BufferSizeTest:
|
|||
self.drive_one(b"1234567890\00\01\02\03\04\05\06")
|
||||
|
||||
def test_nullpat(self):
|
||||
self.drive_one(bytes(1000))
|
||||
self.drive_one(b'\0' * 1000)
|
||||
|
||||
|
||||
class CBufferSizeTest(BufferSizeTest, unittest.TestCase):
|
||||
|
|
|
@ -562,11 +562,11 @@ class BZ2FileTest(BaseTest):
|
|||
|
||||
def testDecompressLimited(self):
|
||||
"""Decompressed data buffering should be limited"""
|
||||
bomb = bz2.compress(bytes(int(2e6)), compresslevel=9)
|
||||
bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9)
|
||||
self.assertLess(len(bomb), _compression.BUFFER_SIZE)
|
||||
|
||||
decomp = BZ2File(BytesIO(bomb))
|
||||
self.assertEqual(bytes(1), decomp.read(1))
|
||||
self.assertEqual(decomp.read(1), b'\0')
|
||||
max_decomp = 1 + DEFAULT_BUFFER_SIZE
|
||||
self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
|
||||
"Excessive amount of data was decompressed")
|
||||
|
|
|
@ -434,12 +434,12 @@ class TestGzip(BaseTest):
|
|||
|
||||
def test_decompress_limited(self):
|
||||
"""Decompressed data buffering should be limited"""
|
||||
bomb = gzip.compress(bytes(int(2e6)), compresslevel=9)
|
||||
bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
|
||||
self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
|
||||
|
||||
bomb = io.BytesIO(bomb)
|
||||
decomp = gzip.GzipFile(fileobj=bomb)
|
||||
self.assertEqual(bytes(1), decomp.read(1))
|
||||
self.assertEqual(decomp.read(1), b'\0')
|
||||
max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
|
||||
self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
|
||||
"Excessive amount of data was decompressed")
|
||||
|
|
|
@ -1812,7 +1812,7 @@ class BufferedRWPairTest(unittest.TestCase):
|
|||
with self.subTest(method):
|
||||
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
||||
|
||||
data = byteslike(5)
|
||||
data = byteslike(b'\0' * 5)
|
||||
self.assertEqual(getattr(pair, method)(data), 5)
|
||||
self.assertEqual(bytes(data), b"abcde")
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ class CommonTestMixin_v4(CommonTestMixin):
|
|||
|
||||
def test_bad_packed_length(self):
|
||||
def assertBadLength(length):
|
||||
addr = bytes(length)
|
||||
addr = b'\0' * length
|
||||
msg = "%r (len %d != 4) is not permitted as an IPv4 address"
|
||||
with self.assertAddressError(re.escape(msg % (addr, length))):
|
||||
self.factory(addr)
|
||||
|
@ -139,11 +139,11 @@ class CommonTestMixin_v6(CommonTestMixin):
|
|||
self.assertInstancesEqual(3232235521, "::c0a8:1")
|
||||
|
||||
def test_packed(self):
|
||||
addr = bytes(12) + bytes.fromhex("00000000")
|
||||
addr = b'\0'*12 + bytes.fromhex("00000000")
|
||||
self.assertInstancesEqual(addr, "::")
|
||||
addr = bytes(12) + bytes.fromhex("c0a80001")
|
||||
addr = b'\0'*12 + bytes.fromhex("c0a80001")
|
||||
self.assertInstancesEqual(addr, "::c0a8:1")
|
||||
addr = bytes.fromhex("c0a80001") + bytes(12)
|
||||
addr = bytes.fromhex("c0a80001") + b'\0'*12
|
||||
self.assertInstancesEqual(addr, "c0a8:1::")
|
||||
|
||||
def test_negative_ints_rejected(self):
|
||||
|
@ -158,7 +158,7 @@ class CommonTestMixin_v6(CommonTestMixin):
|
|||
|
||||
def test_bad_packed_length(self):
|
||||
def assertBadLength(length):
|
||||
addr = bytes(length)
|
||||
addr = b'\0' * length
|
||||
msg = "%r (len %d != 16) is not permitted as an IPv6 address"
|
||||
with self.assertAddressError(re.escape(msg % (addr, length))):
|
||||
self.factory(addr)
|
||||
|
|
|
@ -928,11 +928,11 @@ class FileTestCase(unittest.TestCase):
|
|||
|
||||
def test_decompress_limited(self):
|
||||
"""Decompressed data buffering should be limited"""
|
||||
bomb = lzma.compress(bytes(int(2e6)), preset=6)
|
||||
bomb = lzma.compress(b'\0' * int(2e6), preset=6)
|
||||
self.assertLess(len(bomb), _compression.BUFFER_SIZE)
|
||||
|
||||
decomp = LZMAFile(BytesIO(bomb))
|
||||
self.assertEqual(bytes(1), decomp.read(1))
|
||||
self.assertEqual(decomp.read(1), b'\0')
|
||||
max_decomp = 1 + DEFAULT_BUFFER_SIZE
|
||||
self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
|
||||
"Excessive amount of data was decompressed")
|
||||
|
|
|
@ -406,7 +406,7 @@ class SocketWriterTest(unittest.TestCase):
|
|||
self.server.sent1 = self.wfile.write(b'write data\n')
|
||||
# Should be sent immediately, without requiring flush()
|
||||
self.server.received = self.rfile.readline()
|
||||
big_chunk = bytes(test.support.SOCK_MAX_SIZE)
|
||||
big_chunk = b'\0' * test.support.SOCK_MAX_SIZE
|
||||
self.server.sent2 = self.wfile.write(big_chunk)
|
||||
|
||||
server = socketserver.TCPServer((HOST, 0), Handler)
|
||||
|
|
|
@ -258,7 +258,7 @@ class IntegrationTests(TestCase):
|
|||
|
||||
def app(environ, start_response):
|
||||
start_response("200 OK", [])
|
||||
return [bytes(support.SOCK_MAX_SIZE)]
|
||||
return [b'\0' * support.SOCK_MAX_SIZE]
|
||||
|
||||
class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
|
||||
pass
|
||||
|
|
Loading…
Reference in New Issue