context managerify
This commit is contained in:
parent
aa7cec0ac4
commit
5dc8fabb01
|
@ -1,6 +1,10 @@
|
||||||
from test.support import TESTFN, run_unittest, import_module
|
from test.support import TESTFN, run_unittest, import_module
|
||||||
import unittest
|
import unittest
|
||||||
import os, re, itertools, socket
|
import os
|
||||||
|
import re
|
||||||
|
import itertools
|
||||||
|
import socket
|
||||||
|
import sys
|
||||||
|
|
||||||
# Skip test if we can't import mmap.
|
# Skip test if we can't import mmap.
|
||||||
mmap = import_module('mmap')
|
mmap = import_module('mmap')
|
||||||
|
@ -116,126 +120,119 @@ class MmapTests(unittest.TestCase):
|
||||||
def test_access_parameter(self):
|
def test_access_parameter(self):
|
||||||
# Test for "access" keyword parameter
|
# Test for "access" keyword parameter
|
||||||
mapsize = 10
|
mapsize = 10
|
||||||
open(TESTFN, "wb").write(b"a"*mapsize)
|
with open(TESTFN, "wb") as fp:
|
||||||
f = open(TESTFN, "rb")
|
fp.write(b"a"*mapsize)
|
||||||
m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
|
with open(TESTFN, "rb") as f:
|
||||||
self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
|
m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
|
||||||
|
self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
|
||||||
|
|
||||||
# Ensuring that readonly mmap can't be slice assigned
|
# Ensuring that readonly mmap can't be slice assigned
|
||||||
try:
|
try:
|
||||||
m[:] = b'b'*mapsize
|
m[:] = b'b'*mapsize
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
self.fail("Able to write to readonly memory map")
|
self.fail("Able to write to readonly memory map")
|
||||||
|
|
||||||
# Ensuring that readonly mmap can't be item assigned
|
# Ensuring that readonly mmap can't be item assigned
|
||||||
try:
|
try:
|
||||||
m[0] = b'b'
|
m[0] = b'b'
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
self.fail("Able to write to readonly memory map")
|
self.fail("Able to write to readonly memory map")
|
||||||
|
|
||||||
# Ensuring that readonly mmap can't be write() to
|
# Ensuring that readonly mmap can't be write() to
|
||||||
try:
|
try:
|
||||||
m.seek(0,0)
|
m.seek(0,0)
|
||||||
m.write(b'abc')
|
m.write(b'abc')
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
self.fail("Able to write to readonly memory map")
|
self.fail("Able to write to readonly memory map")
|
||||||
|
|
||||||
# Ensuring that readonly mmap can't be write_byte() to
|
# Ensuring that readonly mmap can't be write_byte() to
|
||||||
try:
|
try:
|
||||||
m.seek(0,0)
|
m.seek(0,0)
|
||||||
m.write_byte(b'd')
|
m.write_byte(b'd')
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
self.fail("Able to write to readonly memory map")
|
self.fail("Able to write to readonly memory map")
|
||||||
|
|
||||||
# Ensuring that readonly mmap can't be resized
|
# Ensuring that readonly mmap can't be resized
|
||||||
try:
|
try:
|
||||||
m.resize(2*mapsize)
|
m.resize(2*mapsize)
|
||||||
except SystemError: # resize is not universally supported
|
except SystemError: # resize is not universally supported
|
||||||
pass
|
pass
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
self.fail("Able to resize readonly memory map")
|
self.fail("Able to resize readonly memory map")
|
||||||
f.close()
|
with open(TESTFN, "rb") as fp:
|
||||||
del m, f
|
self.assertEqual(fp.read(), b'a'*mapsize,
|
||||||
self.assertEqual(open(TESTFN, "rb").read(), b'a'*mapsize,
|
"Readonly memory map data file was modified")
|
||||||
"Readonly memory map data file was modified")
|
|
||||||
|
|
||||||
# Opening mmap with size too big
|
# Opening mmap with size too big
|
||||||
import sys
|
with open(TESTFN, "r+b") as f:
|
||||||
f = open(TESTFN, "r+b")
|
try:
|
||||||
try:
|
m = mmap.mmap(f.fileno(), mapsize+1)
|
||||||
m = mmap.mmap(f.fileno(), mapsize+1)
|
except ValueError:
|
||||||
except ValueError:
|
# we do not expect a ValueError on Windows
|
||||||
# we do not expect a ValueError on Windows
|
# CAUTION: This also changes the size of the file on disk, and
|
||||||
# CAUTION: This also changes the size of the file on disk, and
|
# later tests assume that the length hasn't changed. We need to
|
||||||
# later tests assume that the length hasn't changed. We need to
|
# repair that.
|
||||||
# repair that.
|
if sys.platform.startswith('win'):
|
||||||
|
self.fail("Opening mmap with size+1 should work on Windows.")
|
||||||
|
else:
|
||||||
|
# we expect a ValueError on Unix, but not on Windows
|
||||||
|
if not sys.platform.startswith('win'):
|
||||||
|
self.fail("Opening mmap with size+1 should raise ValueError.")
|
||||||
|
m.close()
|
||||||
if sys.platform.startswith('win'):
|
if sys.platform.startswith('win'):
|
||||||
self.fail("Opening mmap with size+1 should work on Windows.")
|
# Repair damage from the resizing test.
|
||||||
else:
|
with open(TESTFN, 'r+b') as f:
|
||||||
# we expect a ValueError on Unix, but not on Windows
|
f.truncate(mapsize)
|
||||||
if not sys.platform.startswith('win'):
|
|
||||||
self.fail("Opening mmap with size+1 should raise ValueError.")
|
|
||||||
m.close()
|
|
||||||
f.close()
|
|
||||||
if sys.platform.startswith('win'):
|
|
||||||
# Repair damage from the resizing test.
|
|
||||||
f = open(TESTFN, 'r+b')
|
|
||||||
f.truncate(mapsize)
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
# Opening mmap with access=ACCESS_WRITE
|
# Opening mmap with access=ACCESS_WRITE
|
||||||
f = open(TESTFN, "r+b")
|
with open(TESTFN, "r+b") as f:
|
||||||
m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
|
m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
|
||||||
# Modifying write-through memory map
|
# Modifying write-through memory map
|
||||||
m[:] = b'c'*mapsize
|
m[:] = b'c'*mapsize
|
||||||
self.assertEqual(m[:], b'c'*mapsize,
|
self.assertEqual(m[:], b'c'*mapsize,
|
||||||
"Write-through memory map memory not updated properly.")
|
"Write-through memory map memory not updated properly.")
|
||||||
m.flush()
|
m.flush()
|
||||||
m.close()
|
m.close()
|
||||||
f.close()
|
with open(TESTFN, 'rb') as f:
|
||||||
f = open(TESTFN, 'rb')
|
stuff = f.read()
|
||||||
stuff = f.read()
|
|
||||||
f.close()
|
|
||||||
self.assertEqual(stuff, b'c'*mapsize,
|
self.assertEqual(stuff, b'c'*mapsize,
|
||||||
"Write-through memory map data file not updated properly.")
|
"Write-through memory map data file not updated properly.")
|
||||||
|
|
||||||
# Opening mmap with access=ACCESS_COPY
|
# Opening mmap with access=ACCESS_COPY
|
||||||
f = open(TESTFN, "r+b")
|
with open(TESTFN, "r+b") as f:
|
||||||
m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
|
m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
|
||||||
# Modifying copy-on-write memory map
|
# Modifying copy-on-write memory map
|
||||||
m[:] = b'd'*mapsize
|
m[:] = b'd'*mapsize
|
||||||
self.assertEqual(m[:], b'd' * mapsize,
|
self.assertEqual(m[:], b'd' * mapsize,
|
||||||
"Copy-on-write memory map data not written correctly.")
|
"Copy-on-write memory map data not written correctly.")
|
||||||
m.flush()
|
m.flush()
|
||||||
self.assertEqual(open(TESTFN, "rb").read(), b'c'*mapsize,
|
with open(TESTFN, "rb") as fp:
|
||||||
"Copy-on-write test data file should not be modified.")
|
self.assertEqual(fp.read(), b'c'*mapsize,
|
||||||
# Ensuring copy-on-write maps cannot be resized
|
"Copy-on-write test data file should not be modified.")
|
||||||
self.assertRaises(TypeError, m.resize, 2*mapsize)
|
# Ensuring copy-on-write maps cannot be resized
|
||||||
f.close()
|
self.assertRaises(TypeError, m.resize, 2*mapsize)
|
||||||
del m, f
|
m.close()
|
||||||
|
|
||||||
# Ensuring invalid access parameter raises exception
|
# Ensuring invalid access parameter raises exception
|
||||||
f = open(TESTFN, "r+b")
|
with open(TESTFN, "r+b") as f:
|
||||||
self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
|
self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
|
||||||
f.close()
|
|
||||||
|
|
||||||
if os.name == "posix":
|
if os.name == "posix":
|
||||||
# Try incompatible flags, prot and access parameters.
|
# Try incompatible flags, prot and access parameters.
|
||||||
f = open(TESTFN, "r+b")
|
with open(TESTFN, "r+b") as f:
|
||||||
self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
|
self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
|
||||||
flags=mmap.MAP_PRIVATE,
|
flags=mmap.MAP_PRIVATE,
|
||||||
prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
|
prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
|
||||||
f.close()
|
|
||||||
|
|
||||||
def test_bad_file_desc(self):
|
def test_bad_file_desc(self):
|
||||||
# Try opening a bad file descriptor...
|
# Try opening a bad file descriptor...
|
||||||
|
@ -244,14 +241,13 @@ class MmapTests(unittest.TestCase):
|
||||||
def test_tougher_find(self):
|
def test_tougher_find(self):
|
||||||
# Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
|
# Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
|
||||||
# searching for data with embedded \0 bytes didn't work.
|
# searching for data with embedded \0 bytes didn't work.
|
||||||
f = open(TESTFN, 'wb+')
|
with open(TESTFN, 'wb+') as f:
|
||||||
|
|
||||||
data = b'aabaac\x00deef\x00\x00aa\x00'
|
data = b'aabaac\x00deef\x00\x00aa\x00'
|
||||||
n = len(data)
|
n = len(data)
|
||||||
f.write(data)
|
f.write(data)
|
||||||
f.flush()
|
f.flush()
|
||||||
m = mmap.mmap(f.fileno(), n)
|
m = mmap.mmap(f.fileno(), n)
|
||||||
f.close()
|
|
||||||
|
|
||||||
for start in range(n+1):
|
for start in range(n+1):
|
||||||
for finish in range(start, n+1):
|
for finish in range(start, n+1):
|
||||||
|
@ -494,7 +490,8 @@ class MmapTests(unittest.TestCase):
|
||||||
if not hasattr(mmap, 'PROT_READ'):
|
if not hasattr(mmap, 'PROT_READ'):
|
||||||
return
|
return
|
||||||
mapsize = 10
|
mapsize = 10
|
||||||
open(TESTFN, "wb").write(b"a"*mapsize)
|
with open(TESTFN, "wb") as fp:
|
||||||
|
fp.write(b"a"*mapsize)
|
||||||
f = open(TESTFN, "rb")
|
f = open(TESTFN, "rb")
|
||||||
m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
|
m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
|
||||||
self.assertRaises(TypeError, m.write, "foo")
|
self.assertRaises(TypeError, m.write, "foo")
|
||||||
|
@ -506,7 +503,8 @@ class MmapTests(unittest.TestCase):
|
||||||
|
|
||||||
def test_io_methods(self):
|
def test_io_methods(self):
|
||||||
data = b"0123456789"
|
data = b"0123456789"
|
||||||
open(TESTFN, "wb").write(b"x"*len(data))
|
with open(TESTFN, "wb") as fp:
|
||||||
|
fp.write(b"x"*len(data))
|
||||||
f = open(TESTFN, "r+b")
|
f = open(TESTFN, "r+b")
|
||||||
m = mmap.mmap(f.fileno(), len(data))
|
m = mmap.mmap(f.fileno(), len(data))
|
||||||
f.close()
|
f.close()
|
||||||
|
@ -572,7 +570,8 @@ class MmapTests(unittest.TestCase):
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
# Should not crash (Issue 5385)
|
# Should not crash (Issue 5385)
|
||||||
open(TESTFN, "wb").write(b"x"*10)
|
with open(TESTFN, "wb") as fp:
|
||||||
|
fp.write(b"x"*10)
|
||||||
f = open(TESTFN, "r+b")
|
f = open(TESTFN, "r+b")
|
||||||
m = mmap.mmap(f.fileno(), 0)
|
m = mmap.mmap(f.fileno(), 0)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
Loading…
Reference in New Issue