392 lines
15 KiB
Python
392 lines
15 KiB
Python
"""Tests for the packaging.install module."""
|
|
import os
|
|
import logging
|
|
from sysconfig import is_python_build
|
|
from tempfile import mkstemp
|
|
|
|
from packaging import install
|
|
from packaging.pypi.xmlrpc import Client
|
|
from packaging.metadata import Metadata
|
|
from packaging.tests.support import (LoggingCatcher, TempdirManager, unittest,
|
|
fake_dec)
|
|
try:
|
|
import threading
|
|
from packaging.tests.pypi_server import use_xmlrpc_server
|
|
except ImportError:
|
|
threading = None
|
|
use_xmlrpc_server = fake_dec
|
|
|
|
|
|
class InstalledDist:
|
|
"""Distribution object, represent distributions currently installed on the
|
|
system"""
|
|
def __init__(self, name, version, deps):
|
|
self.metadata = Metadata()
|
|
self.name = name
|
|
self.version = version
|
|
self.metadata['Name'] = name
|
|
self.metadata['Version'] = version
|
|
self.metadata['Requires-Dist'] = deps
|
|
|
|
def __repr__(self):
|
|
return '<InstalledDist %r>' % self.metadata['Name']
|
|
|
|
|
|
class ToInstallDist:
|
|
"""Distribution that will be installed"""
|
|
|
|
def __init__(self, files=False):
|
|
self._files = files
|
|
self.install_called = False
|
|
self.install_called_with = {}
|
|
self.uninstall_called = False
|
|
self._real_files = []
|
|
self.name = "fake"
|
|
self.version = "fake"
|
|
if files:
|
|
for f in range(0, 3):
|
|
fp, fn = mkstemp()
|
|
os.close(fp)
|
|
self._real_files.append(fn)
|
|
|
|
def _unlink_installed_files(self):
|
|
if self._files:
|
|
for fn in self._real_files:
|
|
os.unlink(fn)
|
|
|
|
def list_installed_files(self, **args):
|
|
if self._files:
|
|
return self._real_files
|
|
|
|
def get_install(self, **args):
|
|
return self.list_installed_files()
|
|
|
|
|
|
class MagicMock:
|
|
def __init__(self, return_value=None, raise_exception=False):
|
|
self.called = False
|
|
self._times_called = 0
|
|
self._called_with = []
|
|
self._return_value = return_value
|
|
self._raise = raise_exception
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
self.called = True
|
|
self._times_called = self._times_called + 1
|
|
self._called_with.append((args, kwargs))
|
|
iterable = hasattr(self._raise, '__iter__')
|
|
if self._raise:
|
|
if ((not iterable and self._raise)
|
|
or self._raise[self._times_called - 1]):
|
|
raise Exception
|
|
return self._return_value
|
|
|
|
def called_with(self, *args, **kwargs):
|
|
return (args, kwargs) in self._called_with
|
|
|
|
|
|
def get_installed_dists(dists):
|
|
"""Return a list of fake installed dists.
|
|
The list is name, version, deps"""
|
|
objects = []
|
|
for name, version, deps in dists:
|
|
objects.append(InstalledDist(name, version, deps))
|
|
return objects
|
|
|
|
|
|
class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase):
|
|
def _get_client(self, server, *args, **kwargs):
|
|
return Client(server.full_address, *args, **kwargs)
|
|
|
|
def _get_results(self, output):
|
|
"""return a list of results"""
|
|
installed = [(o.name, str(o.version)) for o in output['install']]
|
|
remove = [(o.name, str(o.version)) for o in output['remove']]
|
|
conflict = [(o.name, str(o.version)) for o in output['conflict']]
|
|
return installed, remove, conflict
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_existing_deps(self, server):
|
|
# Test that the installer get the dependencies from the metadatas
|
|
# and ask the index for this dependencies.
|
|
# In this test case, we have choxie that is dependent from towel-stuff
|
|
# 0.1, which is in-turn dependent on bacon <= 0.2:
|
|
# choxie -> towel-stuff -> bacon.
|
|
# Each release metadata is not provided in metadata 1.2.
|
|
client = self._get_client(server)
|
|
archive_path = '%s/distribution.tar.gz' % server.full_address
|
|
server.xmlrpc.set_distributions([
|
|
{'name': 'choxie',
|
|
'version': '2.0.0.9',
|
|
'requires_dist': ['towel-stuff (0.1)'],
|
|
'url': archive_path},
|
|
{'name': 'towel-stuff',
|
|
'version': '0.1',
|
|
'requires_dist': ['bacon (<= 0.2)'],
|
|
'url': archive_path},
|
|
{'name': 'bacon',
|
|
'version': '0.1',
|
|
'requires_dist': [],
|
|
'url': archive_path},
|
|
])
|
|
installed = get_installed_dists([('bacon', '0.1', [])])
|
|
output = install.get_infos("choxie", index=client,
|
|
installed=installed)
|
|
|
|
# we don't have installed bacon as it's already installed system-wide
|
|
self.assertEqual(0, len(output['remove']))
|
|
self.assertEqual(2, len(output['install']))
|
|
readable_output = [(o.name, str(o.version))
|
|
for o in output['install']]
|
|
self.assertIn(('towel-stuff', '0.1'), readable_output)
|
|
self.assertIn(('choxie', '2.0.0.9'), readable_output)
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_upgrade_existing_deps(self, server):
|
|
client = self._get_client(server)
|
|
archive_path = '%s/distribution.tar.gz' % server.full_address
|
|
server.xmlrpc.set_distributions([
|
|
{'name': 'choxie',
|
|
'version': '2.0.0.9',
|
|
'requires_dist': ['towel-stuff (0.1)'],
|
|
'url': archive_path},
|
|
{'name': 'towel-stuff',
|
|
'version': '0.1',
|
|
'requires_dist': ['bacon (>= 0.2)'],
|
|
'url': archive_path},
|
|
{'name': 'bacon',
|
|
'version': '0.2',
|
|
'requires_dist': [],
|
|
'url': archive_path},
|
|
])
|
|
|
|
output = install.get_infos("choxie", index=client,
|
|
installed=get_installed_dists([('bacon', '0.1', [])]))
|
|
installed = [(o.name, str(o.version)) for o in output['install']]
|
|
|
|
# we need bacon 0.2, but 0.1 is installed.
|
|
# So we expect to remove 0.1 and to install 0.2 instead.
|
|
remove = [(o.name, str(o.version)) for o in output['remove']]
|
|
self.assertIn(('choxie', '2.0.0.9'), installed)
|
|
self.assertIn(('towel-stuff', '0.1'), installed)
|
|
self.assertIn(('bacon', '0.2'), installed)
|
|
self.assertIn(('bacon', '0.1'), remove)
|
|
self.assertEqual(0, len(output['conflict']))
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_conflicts(self, server):
|
|
# Tests that conflicts are detected
|
|
client = self._get_client(server)
|
|
archive_path = '%s/distribution.tar.gz' % server.full_address
|
|
|
|
# choxie depends on towel-stuff, which depends on bacon.
|
|
server.xmlrpc.set_distributions([
|
|
{'name': 'choxie',
|
|
'version': '2.0.0.9',
|
|
'requires_dist': ['towel-stuff (0.1)'],
|
|
'url': archive_path},
|
|
{'name': 'towel-stuff',
|
|
'version': '0.1',
|
|
'requires_dist': ['bacon (>= 0.2)'],
|
|
'url': archive_path},
|
|
{'name': 'bacon',
|
|
'version': '0.2',
|
|
'requires_dist': [],
|
|
'url': archive_path},
|
|
])
|
|
|
|
# name, version, deps.
|
|
already_installed = [('bacon', '0.1', []),
|
|
('chicken', '1.1', ['bacon (0.1)'])]
|
|
output = install.get_infos(
|
|
'choxie', index=client,
|
|
installed=get_installed_dists(already_installed))
|
|
|
|
# we need bacon 0.2, but 0.1 is installed.
|
|
# So we expect to remove 0.1 and to install 0.2 instead.
|
|
installed, remove, conflict = self._get_results(output)
|
|
self.assertIn(('choxie', '2.0.0.9'), installed)
|
|
self.assertIn(('towel-stuff', '0.1'), installed)
|
|
self.assertIn(('bacon', '0.2'), installed)
|
|
self.assertIn(('bacon', '0.1'), remove)
|
|
self.assertIn(('chicken', '1.1'), conflict)
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_installation_unexisting_project(self, server):
|
|
# Test that the isntalled raises an exception if the project does not
|
|
# exists.
|
|
client = self._get_client(server)
|
|
self.assertRaises(install.InstallationException,
|
|
install.get_infos,
|
|
'unexisting project', index=client)
|
|
|
|
def test_move_files(self):
|
|
# test that the files are really moved, and that the new path is
|
|
# returned.
|
|
path = self.mkdtemp()
|
|
newpath = self.mkdtemp()
|
|
files = [os.path.join(path, str(x)) for x in range(1, 20)]
|
|
for f in files:
|
|
open(f, 'ab+').close()
|
|
output = [o for o in install._move_files(files, newpath)]
|
|
|
|
# check that output return the list of old/new places
|
|
for file_ in files:
|
|
name = os.path.split(file_)[-1]
|
|
newloc = os.path.join(newpath, name)
|
|
self.assertIn((file_, newloc), output)
|
|
|
|
# remove the files
|
|
for f in [o[1] for o in output]: # o[1] is the new place
|
|
os.remove(f)
|
|
|
|
def test_update_infos(self):
|
|
tests = [[
|
|
{'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']},
|
|
{'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']},
|
|
{'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'],
|
|
'baz': ['foo', 'foo', 'test', 'foo']},
|
|
]]
|
|
|
|
for dict1, dict2, expect in tests:
|
|
install._update_infos(dict1, dict2)
|
|
for key in expect:
|
|
self.assertEqual(expect[key], dict1[key])
|
|
|
|
def test_install_dists_rollback(self):
|
|
# if one of the distribution installation fails, call uninstall on all
|
|
# installed distributions.
|
|
|
|
old_install_dist = install._install_dist
|
|
old_uninstall = getattr(install, 'uninstall', None)
|
|
|
|
install._install_dist = MagicMock(return_value=[],
|
|
raise_exception=(False, True))
|
|
install.remove = MagicMock()
|
|
try:
|
|
d1 = ToInstallDist()
|
|
d2 = ToInstallDist()
|
|
path = self.mkdtemp()
|
|
self.assertRaises(Exception, install.install_dists, [d1, d2], path)
|
|
self.assertTrue(install._install_dist.called_with(d1, path))
|
|
self.assertTrue(install.remove.called)
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
install.remove = old_uninstall
|
|
|
|
def test_install_dists_success(self):
|
|
old_install_dist = install._install_dist
|
|
install._install_dist = MagicMock(return_value=[])
|
|
try:
|
|
# test that the install method is called on each distributions
|
|
d1 = ToInstallDist()
|
|
d2 = ToInstallDist()
|
|
|
|
# should call install
|
|
path = self.mkdtemp()
|
|
install.install_dists([d1, d2], path)
|
|
for dist in (d1, d2):
|
|
self.assertTrue(install._install_dist.called_with(dist, path))
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
|
|
def test_install_from_infos_conflict(self):
|
|
# assert conflicts raise an exception
|
|
self.assertRaises(install.InstallationConflict,
|
|
install.install_from_infos,
|
|
conflicts=[ToInstallDist()])
|
|
|
|
def test_install_from_infos_remove_success(self):
|
|
old_install_dists = install.install_dists
|
|
install.install_dists = lambda x, y=None: None
|
|
try:
|
|
dists = []
|
|
for i in range(2):
|
|
dists.append(ToInstallDist(files=True))
|
|
install.install_from_infos(remove=dists)
|
|
|
|
# assert that the files have been removed
|
|
for dist in dists:
|
|
for f in dist.list_installed_files():
|
|
self.assertFalse(os.path.exists(f))
|
|
finally:
|
|
install.install_dists = old_install_dists
|
|
|
|
def test_install_from_infos_remove_rollback(self):
|
|
old_install_dist = install._install_dist
|
|
old_uninstall = getattr(install, 'uninstall', None)
|
|
|
|
install._install_dist = MagicMock(return_value=[],
|
|
raise_exception=(False, True))
|
|
install.uninstall = MagicMock()
|
|
try:
|
|
# assert that if an error occurs, the removed files are restored.
|
|
remove = []
|
|
for i in range(2):
|
|
remove.append(ToInstallDist(files=True))
|
|
to_install = [ToInstallDist(), ToInstallDist()]
|
|
temp_dir = self.mkdtemp()
|
|
|
|
self.assertRaises(Exception, install.install_from_infos,
|
|
install_path=temp_dir, install=to_install,
|
|
remove=remove)
|
|
# assert that the files are in the same place
|
|
# assert that the files have been removed
|
|
for dist in remove:
|
|
for f in dist.list_installed_files():
|
|
self.assertTrue(os.path.exists(f))
|
|
dist._unlink_installed_files()
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
install.uninstall = old_uninstall
|
|
|
|
def test_install_from_infos_install_succes(self):
|
|
old_install_dist = install._install_dist
|
|
install._install_dist = MagicMock([])
|
|
try:
|
|
# assert that the distribution can be installed
|
|
install_path = "my_install_path"
|
|
to_install = [ToInstallDist(), ToInstallDist()]
|
|
|
|
install.install_from_infos(install_path, install=to_install)
|
|
for dist in to_install:
|
|
install._install_dist.called_with(install_path)
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
|
|
def test_install_permission_denied(self):
|
|
# if we don't have access to the installation path, we should abort
|
|
# immediately
|
|
project = os.path.join(os.path.dirname(__file__), 'package.tgz')
|
|
|
|
# when running from an uninstalled build, a warning is emitted and the
|
|
# installation is not attempted
|
|
if is_python_build():
|
|
self.assertFalse(install.install(project))
|
|
self.assertEqual(1, len(self.get_logs(logging.ERROR)))
|
|
return
|
|
|
|
install_path = self.mkdtemp()
|
|
old_get_path = install.get_path
|
|
install.get_path = lambda path: install_path
|
|
old_mod = os.stat(install_path).st_mode
|
|
os.chmod(install_path, 0)
|
|
try:
|
|
self.assertFalse(install.install(project))
|
|
finally:
|
|
os.chmod(install_path, old_mod)
|
|
install.get_path = old_get_path
|
|
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(TestInstall))
|
|
return suite
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|