Make gdbm and dumbdbm use byte strings. Updated their tests.
This commit is contained in:
parent
517bcfeb6b
commit
6252e10ed9
|
@ -21,12 +21,11 @@ is read when the database is opened, and some updates rewrite the whole index)
|
|||
|
||||
"""
|
||||
|
||||
import io as _io
|
||||
import os as _os
|
||||
import __builtin__
|
||||
import UserDict
|
||||
|
||||
_open = __builtin__.open
|
||||
|
||||
_BLOCKSIZE = 512
|
||||
|
||||
error = IOError # For anydbm
|
||||
|
@ -42,7 +41,7 @@ class _Database(UserDict.DictMixin):
|
|||
# _commit() finish successfully, we can't ignore shutdown races
|
||||
# here, and _commit() must not reference any globals.
|
||||
_os = _os # for _commit()
|
||||
_open = _open # for _commit()
|
||||
_io = _io # for _commit()
|
||||
|
||||
def __init__(self, filebasename, mode):
|
||||
self._mode = mode
|
||||
|
@ -66,9 +65,9 @@ class _Database(UserDict.DictMixin):
|
|||
|
||||
# Mod by Jack: create data file if needed
|
||||
try:
|
||||
f = _open(self._datfile, 'r')
|
||||
f = _io.open(self._datfile, 'r')
|
||||
except IOError:
|
||||
f = _open(self._datfile, 'w')
|
||||
f = _io.open(self._datfile, 'w')
|
||||
self._chmod(self._datfile)
|
||||
f.close()
|
||||
self._update()
|
||||
|
@ -77,7 +76,7 @@ class _Database(UserDict.DictMixin):
|
|||
def _update(self):
|
||||
self._index = {}
|
||||
try:
|
||||
f = _open(self._dirfile)
|
||||
f = _io.open(self._dirfile, 'r')
|
||||
except IOError:
|
||||
pass
|
||||
else:
|
||||
|
@ -107,7 +106,7 @@ class _Database(UserDict.DictMixin):
|
|||
except self._os.error:
|
||||
pass
|
||||
|
||||
f = self._open(self._dirfile, 'w')
|
||||
f = self._io.open(self._dirfile, 'w')
|
||||
self._chmod(self._dirfile)
|
||||
for key, pos_and_siz_pair in self._index.items():
|
||||
f.write("%r, %r\n" % (key, pos_and_siz_pair))
|
||||
|
@ -117,7 +116,7 @@ class _Database(UserDict.DictMixin):
|
|||
|
||||
def __getitem__(self, key):
|
||||
pos, siz = self._index[key] # may raise KeyError
|
||||
f = _open(self._datfile, 'rb')
|
||||
f = _io.open(self._datfile, 'rb')
|
||||
f.seek(pos)
|
||||
dat = f.read(siz)
|
||||
f.close()
|
||||
|
@ -128,11 +127,11 @@ class _Database(UserDict.DictMixin):
|
|||
# to get to an aligned offset. Return pair
|
||||
# (starting offset of val, len(val))
|
||||
def _addval(self, val):
|
||||
f = _open(self._datfile, 'rb+')
|
||||
f = _io.open(self._datfile, 'rb+')
|
||||
f.seek(0, 2)
|
||||
pos = int(f.tell())
|
||||
npos = ((pos + _BLOCKSIZE - 1) // _BLOCKSIZE) * _BLOCKSIZE
|
||||
f.write('\0'*(npos-pos))
|
||||
f.write(b'\0'*(npos-pos))
|
||||
pos = npos
|
||||
f.write(val)
|
||||
f.close()
|
||||
|
@ -143,7 +142,7 @@ class _Database(UserDict.DictMixin):
|
|||
# pos to hold val, without overwriting some other value. Return
|
||||
# pair (pos, len(val)).
|
||||
def _setval(self, pos, val):
|
||||
f = _open(self._datfile, 'rb+')
|
||||
f = _io.open(self._datfile, 'rb+')
|
||||
f.seek(pos)
|
||||
f.write(val)
|
||||
f.close()
|
||||
|
@ -154,14 +153,16 @@ class _Database(UserDict.DictMixin):
|
|||
# the in-memory index dict, and append one to the directory file.
|
||||
def _addkey(self, key, pos_and_siz_pair):
|
||||
self._index[key] = pos_and_siz_pair
|
||||
f = _open(self._dirfile, 'a')
|
||||
f = _io.open(self._dirfile, 'a')
|
||||
self._chmod(self._dirfile)
|
||||
f.write("%r, %r\n" % (key, pos_and_siz_pair))
|
||||
f.close()
|
||||
|
||||
def __setitem__(self, key, val):
|
||||
if not type(key) == type('') == type(val):
|
||||
raise TypeError, "keys and values must be strings"
|
||||
if not isinstance(key, basestring):
|
||||
raise TypeError("keys must be strings")
|
||||
if not isinstance(val, (str8, bytes)):
|
||||
raise TypeError("values must be byte strings")
|
||||
if key not in self._index:
|
||||
self._addkey(key, self._addval(val))
|
||||
else:
|
||||
|
|
|
@ -21,13 +21,13 @@ def _delete_files():
|
|||
pass
|
||||
|
||||
class AnyDBMTestCase(unittest.TestCase):
|
||||
_dict = {'0': '',
|
||||
'a': 'Python:',
|
||||
'b': 'Programming',
|
||||
'c': 'the',
|
||||
'd': 'way',
|
||||
'f': 'Guido',
|
||||
'g': 'intended'
|
||||
_dict = {'0': b'',
|
||||
'a': b'Python:',
|
||||
'b': b'Programming',
|
||||
'c': b'the',
|
||||
'd': b'way',
|
||||
'f': b'Guido',
|
||||
'g': b'intended',
|
||||
}
|
||||
|
||||
def __init__(self, *args):
|
||||
|
@ -44,7 +44,7 @@ class AnyDBMTestCase(unittest.TestCase):
|
|||
def test_anydbm_modification(self):
|
||||
self.init_db()
|
||||
f = anydbm.open(_fname, 'c')
|
||||
self._dict['g'] = f['g'] = "indented"
|
||||
self._dict['g'] = f['g'] = b"indented"
|
||||
self.read_helper(f)
|
||||
f.close()
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
Original by Roger E. Masse
|
||||
"""
|
||||
|
||||
import io
|
||||
import os
|
||||
import unittest
|
||||
import dumbdbm
|
||||
|
@ -18,13 +19,13 @@ def _delete_files():
|
|||
pass
|
||||
|
||||
class DumbDBMTestCase(unittest.TestCase):
|
||||
_dict = {'0': '',
|
||||
'a': 'Python:',
|
||||
'b': 'Programming',
|
||||
'c': 'the',
|
||||
'd': 'way',
|
||||
'f': 'Guido',
|
||||
'g': 'intended'
|
||||
_dict = {'0': b'',
|
||||
'a': b'Python:',
|
||||
'b': b'Programming',
|
||||
'c': b'the',
|
||||
'd': b'way',
|
||||
'f': b'Guido',
|
||||
'g': b'intended',
|
||||
}
|
||||
|
||||
def __init__(self, *args):
|
||||
|
@ -64,15 +65,15 @@ class DumbDBMTestCase(unittest.TestCase):
|
|||
|
||||
def test_close_twice(self):
|
||||
f = dumbdbm.open(_fname)
|
||||
f['a'] = 'b'
|
||||
self.assertEqual(f['a'], 'b')
|
||||
f['a'] = b'b'
|
||||
self.assertEqual(f['a'], b'b')
|
||||
f.close()
|
||||
f.close()
|
||||
|
||||
def test_dumbdbm_modification(self):
|
||||
self.init_db()
|
||||
f = dumbdbm.open(_fname, 'w')
|
||||
self._dict['g'] = f['g'] = "indented"
|
||||
self._dict['g'] = f['g'] = b"indented"
|
||||
self.read_helper(f)
|
||||
f.close()
|
||||
|
||||
|
@ -91,29 +92,29 @@ class DumbDBMTestCase(unittest.TestCase):
|
|||
def test_write_write_read(self):
|
||||
# test for bug #482460
|
||||
f = dumbdbm.open(_fname)
|
||||
f['1'] = 'hello'
|
||||
f['1'] = 'hello2'
|
||||
f['1'] = b'hello'
|
||||
f['1'] = b'hello2'
|
||||
f.close()
|
||||
f = dumbdbm.open(_fname)
|
||||
self.assertEqual(f['1'], 'hello2')
|
||||
self.assertEqual(f['1'], b'hello2')
|
||||
f.close()
|
||||
|
||||
def test_line_endings(self):
|
||||
# test for bug #1172763: dumbdbm would die if the line endings
|
||||
# weren't what was expected.
|
||||
f = dumbdbm.open(_fname)
|
||||
f['1'] = 'hello'
|
||||
f['2'] = 'hello2'
|
||||
f['1'] = b'hello'
|
||||
f['2'] = b'hello2'
|
||||
f.close()
|
||||
|
||||
# Mangle the file by adding \r before each newline
|
||||
data = open(_fname + '.dir').read()
|
||||
data = data.replace('\n', '\r\n')
|
||||
open(_fname + '.dir', 'wb').write(data)
|
||||
data = io.open(_fname + '.dir', 'rb').read()
|
||||
data = data.replace(b'\n', b'\r\n')
|
||||
io.open(_fname + '.dir', 'wb').write(data)
|
||||
|
||||
f = dumbdbm.open(_fname)
|
||||
self.assertEqual(f['1'], 'hello')
|
||||
self.assertEqual(f['2'], 'hello2')
|
||||
self.assertEqual(f['1'], b'hello')
|
||||
self.assertEqual(f['2'], b'hello2')
|
||||
|
||||
|
||||
def read_helper(self, f):
|
||||
|
@ -147,7 +148,7 @@ class DumbDBMTestCase(unittest.TestCase):
|
|||
del d[k]
|
||||
del f[k]
|
||||
else:
|
||||
v = random.choice('abc') * random.randrange(10000)
|
||||
v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
|
||||
d[k] = v
|
||||
f[k] = v
|
||||
self.assertEqual(f[k], v)
|
||||
|
|
|
@ -12,7 +12,7 @@ filename = TESTFN
|
|||
g = gdbm.open(filename, 'c')
|
||||
verify(g.keys() == [])
|
||||
g['a'] = 'b'
|
||||
g['12345678910'] = '019237410982340912840198242'
|
||||
g['12345678910'] = b'019237410982340912840198242'
|
||||
a = g.keys()
|
||||
if verbose:
|
||||
print('Test gdbm file keys: ', a)
|
||||
|
|
|
@ -51,7 +51,7 @@ for name in anydbm._names:
|
|||
self.assertEqual(name, whichdb.whichdb(_fname))
|
||||
# Now add a key
|
||||
f = mod.open(_fname, 'w')
|
||||
f["1"] = "1"
|
||||
f["1"] = b"1"
|
||||
f.close()
|
||||
self.assertEqual(name, whichdb.whichdb(_fname))
|
||||
setattr(WhichDBTestCase,"test_whichdb_%s" % name, test_whichdb_name)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# !/usr/bin/env python
|
||||
"""Guess which db package to use to open a db file."""
|
||||
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
|
@ -29,18 +30,18 @@ def whichdb(filename):
|
|||
|
||||
# Check for dbm first -- this has a .pag and a .dir file
|
||||
try:
|
||||
f = open(filename + os.extsep + "pag", "rb")
|
||||
f = io.open(filename + os.extsep + "pag", "rb")
|
||||
f.close()
|
||||
# dbm linked with gdbm on OS/2 doesn't have .dir file
|
||||
if not (dbm.library == "GNU gdbm" and sys.platform == "os2emx"):
|
||||
f = open(filename + os.extsep + "dir", "rb")
|
||||
f = io.open(filename + os.extsep + "dir", "rb")
|
||||
f.close()
|
||||
return "dbm"
|
||||
except IOError:
|
||||
# some dbm emulations based on Berkeley DB generate a .db file
|
||||
# some do not, but they should be caught by the dbhash checks
|
||||
try:
|
||||
f = open(filename + os.extsep + "db", "rb")
|
||||
f = io.open(filename + os.extsep + "db", "rb")
|
||||
f.close()
|
||||
# guarantee we can actually open the file using dbm
|
||||
# kind of overkill, but since we are dealing with emulations
|
||||
|
@ -60,9 +61,9 @@ def whichdb(filename):
|
|||
# dumbdbm files with no keys are empty
|
||||
if size == 0:
|
||||
return "dumbdbm"
|
||||
f = open(filename + os.extsep + "dir", "rb")
|
||||
f = io.open(filename + os.extsep + "dir", "rb")
|
||||
try:
|
||||
if f.read(1) in ("'", '"'):
|
||||
if f.read(1) in (b"'", b'"'):
|
||||
return "dumbdbm"
|
||||
finally:
|
||||
f.close()
|
||||
|
@ -71,7 +72,7 @@ def whichdb(filename):
|
|||
|
||||
# See if the file exists, return None if not
|
||||
try:
|
||||
f = open(filename, "rb")
|
||||
f = io.open(filename, "rb")
|
||||
except IOError:
|
||||
return None
|
||||
|
||||
|
|
|
@ -127,11 +127,10 @@ dbm_subscript(dbmobject *dp, register PyObject *key)
|
|||
}
|
||||
drec = gdbm_fetch(dp->di_dbm, krec);
|
||||
if (drec.dptr == 0) {
|
||||
PyErr_SetString(PyExc_KeyError,
|
||||
PyString_AS_STRING((PyStringObject *)key));
|
||||
PyErr_SetObject(PyExc_KeyError, key);
|
||||
return NULL;
|
||||
}
|
||||
v = PyString_FromStringAndSize(drec.dptr, drec.dsize);
|
||||
v = PyBytes_FromStringAndSize(drec.dptr, drec.dsize);
|
||||
free(drec.dptr);
|
||||
return v;
|
||||
}
|
||||
|
@ -154,15 +153,14 @@ dbm_ass_sub(dbmobject *dp, PyObject *v, PyObject *w)
|
|||
dp->di_size = -1;
|
||||
if (w == NULL) {
|
||||
if (gdbm_delete(dp->di_dbm, krec) < 0) {
|
||||
PyErr_SetString(PyExc_KeyError,
|
||||
PyString_AS_STRING((PyStringObject *)v));
|
||||
PyErr_SetObject(PyExc_KeyError, v);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (!PyArg_Parse(w, "s#", &drec.dptr, &drec.dsize)) {
|
||||
PyErr_SetString(PyExc_TypeError,
|
||||
"gdbm mappings have string elements only");
|
||||
"gdbm mappings have byte string elements only");
|
||||
return -1;
|
||||
}
|
||||
errno = 0;
|
||||
|
@ -198,6 +196,7 @@ dbm_close(register dbmobject *dp, PyObject *unused)
|
|||
return Py_None;
|
||||
}
|
||||
|
||||
/* XXX Should return a set or a set view */
|
||||
PyDoc_STRVAR(dbm_keys__doc__,
|
||||
"keys() -> list_of_keys\n\
|
||||
Get a list of all keys in the database.");
|
||||
|
@ -252,6 +251,11 @@ dbm_contains(PyObject *self, PyObject *arg)
|
|||
"GDBM object has already been closed");
|
||||
return -1;
|
||||
}
|
||||
if (PyUnicode_Check(arg)) {
|
||||
arg = _PyUnicode_AsDefaultEncodedString(arg, NULL);
|
||||
if (arg == NULL)
|
||||
return -1;
|
||||
}
|
||||
if (!PyString_Check(arg)) {
|
||||
PyErr_Format(PyExc_TypeError,
|
||||
"gdbm key must be string, not %.100s",
|
||||
|
|
Loading…
Reference in New Issue