# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# Author Tully Foote/tfoote@willowgarage.com, Ken Conley/kwc@willowgarage.com
import os
import subprocess
import traceback
from rospkg.os_detect import OsDetect
from .core import rd_debug, RosdepInternalError, InstallFailed, print_bold, InvalidData
# kwc: InstallerContext is basically just a bunch of dictionaries with
# defined lookup methods. It really encompasses two facets of a
# rosdep configuration: the pluggable nature of installers and
# platforms, as well as the resolution of the operating system for a
# specific machine. It is possible to decouple those two notions,
# though there are some touch points over how this interfaces with the
# rospkg.os_detect library, i.e. how platforms can tweak these
# detectors and how the higher-level APIs can override them.
[docs]
class InstallerContext(object):
"""
:class:`InstallerContext` manages the context of execution for rosdep as it
relates to the installers, OS detectors, and other extensible
APIs.
"""
def __init__(self, os_detect=None):
"""
:param os_detect: (optional)
:class:`rospkg.os_detect.OsDetect` instance to use for
detecting platforms. If `None`, default instance will be
used.
"""
# platform configuration
self.installers = {}
self.os_installers = {}
self.default_os_installer = {}
# stores configuration of which value to use for the OS version key (version number or codename)
self.os_version_type = {}
# OS detection and override
if os_detect is None:
os_detect = OsDetect()
self.os_detect = os_detect
self.os_override = None
self.verbose = False
def set_verbose(self, verbose):
self.verbose = verbose
[docs]
def set_os_override(self, os_name, os_version):
"""
Override the OS detector with *os_name* and *os_version*. See
:meth:`InstallerContext.detect_os`.
:param os_name: OS name value to use, ``str``
:param os_version: OS version value to use, ``str``
"""
if self.verbose:
print('overriding OS to [%s:%s]' % (os_name, os_version))
self.os_override = os_name, os_version
def get_os_version_type(self, os_name):
return self.os_version_type.get(os_name, OsDetect.get_version)
def set_os_version_type(self, os_name, version_type):
if not hasattr(version_type, '__call__'):
raise ValueError('version type should be a method')
self.os_version_type[os_name] = version_type
[docs]
def get_os_name_and_version(self):
"""
Get the OS name and version key to use for resolution and
installation. This will be the detected OS name/version
unless :meth:`InstallerContext.set_os_override()` has been
called.
:returns: (os_name, os_version), ``(str, str)``
"""
if self.os_override:
return self.os_override
else:
os_name = self.os_detect.get_name()
os_key = self.get_os_version_type(os_name)
os_version = os_key(self.os_detect)
return os_name, os_version
[docs]
def get_os_detect(self):
"""
:returns os_detect: :class:`OsDetect` instance used for
detecting platforms.
"""
return self.os_detect
[docs]
def set_installer(self, installer_key, installer):
"""
Set the installer to use for *installer_key*. This will
replace any existing installer associated with the key.
*installer_key* should be the same key used for the
``rosdep.yaml`` package manager key. If *installer* is
``None``, this will delete any existing associated installer
from this context.
:param installer_key: key/name to associate with installer, ``str``
:param installer: :class:`Installer` implementation, ``class``.
:raises: :exc:`TypeError` if *installer* is not a subclass of
:class:`Installer`
"""
if installer is None:
del self.installers[installer_key]
return
if not isinstance(installer, Installer):
raise TypeError('installer must be a instance of Installer')
if self.verbose:
print('registering installer [%s]' % (installer_key))
self.installers[installer_key] = installer
[docs]
def get_installer(self, installer_key):
"""
:returns: :class:`Installer` class associated with *installer_key*.
:raises: :exc:`KeyError` If not associated installer
:raises: :exc:`InstallFailed` If installer cannot produce an install command (e.g. if installer is not installed)
"""
return self.installers[installer_key]
[docs]
def get_installer_keys(self):
"""
:returns: list of registered installer keys
"""
return self.installers.keys()
[docs]
def get_os_keys(self):
"""
:returns: list of OS keys that have registered with this context, ``[str]``
"""
return self.os_installers.keys()
[docs]
def add_os_installer_key(self, os_key, installer_key):
"""
Register an installer for the specified OS. This will fail
with a :exc:`KeyError` if no :class:`Installer` can be found
with the associated *installer_key*.
:param os_key: Key for OS
:param installer_key: Key for installer to add to OS
:raises: :exc:`KeyError`: if installer for *installer_key*
is not set.
"""
# validate, will throw KeyError
self.get_installer(installer_key)
if self.verbose:
print('add installer [%s] to OS [%s]' % (installer_key, os_key))
if os_key in self.os_installers:
self.os_installers[os_key].append(installer_key)
else:
self.os_installers[os_key] = [installer_key]
[docs]
def get_os_installer_keys(self, os_key):
"""
Get list of installer keys registered for the specified OS.
These keys can be resolved by calling
:meth:`InstallerContext.get_installer`.
:param os_key: Key for OS
:raises: :exc:`KeyError`: if no information for OS *os_key* is registered.
"""
if os_key in self.os_installers:
return self.os_installers[os_key][:]
else:
raise KeyError(os_key)
[docs]
def set_default_os_installer_key(self, os_key, installer_key):
"""
Set the default OS installer to use for OS.
:meth:`InstallerContext.add_os_installer` must have previously
been called with the same arguments.
:param os_key: Key for OS
:param installer_key: Key for installer to add to OS
:raises: :exc:`KeyError`: if installer for *installer_key*
is not set or if OS for *os_key* has no associated installers.
"""
if os_key not in self.os_installers:
raise KeyError('unknown OS: %s' % (os_key))
if not hasattr(installer_key, '__call__'):
raise ValueError('version type should be a method')
if not installer_key(self.os_detect) in self.os_installers[os_key]:
raise KeyError('installer [%s] is not associated with OS [%s]. call add_os_installer_key() first' % (installer_key(self.os_detect), os_key))
if self.verbose:
print('set default installer [%s] for OS [%s]' % (installer_key(self.os_detect), os_key,))
self.default_os_installer[os_key] = installer_key
[docs]
def get_default_os_installer_key(self, os_key):
"""
Get the default OS installer key to use for OS, or ``None`` if
there is no default.
:param os_key: Key for OS
:returns: :class:`Installer`
:raises: :exc:`KeyError`: if no information for OS *os_key* is registered.
"""
if os_key not in self.os_installers:
raise KeyError('unknown OS: %s' % (os_key))
try:
installer_key = self.default_os_installer[os_key](self.os_detect)
if installer_key not in self.os_installers[os_key]:
raise KeyError('installer [%s] is not associated with OS [%s]. call add_os_installer_key() first' % (installer_key, os_key))
# validate, will throw KeyError
self.get_installer(installer_key)
return installer_key
except KeyError:
return None
[docs]
class Installer(object):
"""
The :class:`Installer` API is designed around opaque *resolved*
parameters. These parameters can be any type of sequence object,
but they must obey set arithmetic. They should also implement
``__str__()`` methods so they can be pretty printed.
"""
[docs]
def is_installed(self, resolved_item):
"""
:param resolved: resolved installation item. NOTE: this is a single item,
not a list of items like the other APIs, ``opaque``.
:returns: ``True`` if all of the *resolved* items are installed on
the local system
"""
raise NotImplementedError('is_installed', resolved_item)
[docs]
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
"""
:param resolved: list of resolved installation items, ``[opaque]``
:param interactive: If `False`, disable interactive prompts,
e.g. Pass through ``-y`` or equivalant to package manager.
:param reinstall: If `True`, install everything even if already installed
"""
raise NotImplementedError('get_package_install_command', resolved, interactive, reinstall, quiet)
[docs]
def get_depends(self, rosdep_args):
"""
:returns: list of dependencies on other rosdep keys. Only
necessary if the package manager doesn't handle
dependencies.
"""
return [] # Default return empty list
[docs]
def resolve(self, rosdep_args_dict):
"""
:param rosdep_args_dict: argument dictionary to the rosdep rule for this package manager
:returns: [resolutions]. resolved objects should be printable to a user, but are otherwise opaque.
"""
raise NotImplementedError('Base class resolve', rosdep_args_dict)
[docs]
def unique(self, *resolved_rules):
"""
Combine the resolved rules into a unique list. This
is meant to combine the results of multiple calls to
:meth:`PackageManagerInstaller.resolve`.
Example::
resolved1 = installer.resolve(args1)
resolved2 = installer.resolve(args2)
resolved = installer.unique(resolved1, resolved2)
:param resolved_rules: resolved arguments. Resolved
arguments must all be from this :class:`Installer` instance.
"""
raise NotImplementedError('Base class unique', resolved_rules)
[docs]
class PackageManagerInstaller(Installer):
"""
General form of a package manager :class:`Installer`
implementation that assumes:
- installer rosdep args spec is a list of package names stored with the key "packages"
- a detect function exists that can return a list of packages that are installed
Also, if *supports_depends* is set to ``True``:
- installer rosdep args spec can also include dependency specification with the key "depends"
"""
def __init__(self, detect_fn, supports_depends=False):
"""
:param supports_depends: package manager supports dependency key
:param detect_fn: function that for a given list of packages determines
the list of installed packages.
"""
self.detect_fn = detect_fn
self.supports_depends = supports_depends
self.as_root = True
self.sudo_command = 'sudo -H' if hasattr(os, 'geteuid') and os.geteuid() != 0 else ''
[docs]
def elevate_priv(self, cmd):
"""
Prepend *self.sudo_command* to the command if *self.as_root* is ``True``.
:param list cmd: list of strings comprising the command
:returns: a list of commands
"""
return (self.sudo_command.split() if self.as_root else []) + cmd
[docs]
def resolve(self, rosdep_args):
"""
See :meth:`Installer.resolve()`
"""
packages = None
if type(rosdep_args) is dict:
packages = rosdep_args.get('packages', [])
if isinstance(packages, str):
packages = packages.split()
elif isinstance(rosdep_args, str):
packages = rosdep_args.split(' ')
elif type(rosdep_args) is list:
packages = rosdep_args
else:
raise InvalidData('Invalid rosdep args: %s' % (rosdep_args))
return packages
[docs]
def unique(self, *resolved_rules):
"""
See :meth:`Installer.unique()`
"""
s = set()
for resolved in resolved_rules:
s.update(resolved)
return sorted(s)
[docs]
def get_packages_to_install(self, resolved, reinstall=False):
"""
Return a list of packages (out of *resolved*) that still need to get
installed.
"""
if reinstall:
return resolved
if not resolved:
return []
else:
detected = self.detect_fn(resolved)
return [x for x in resolved if x not in detected]
[docs]
def is_installed(self, resolved_item):
"""
Check if a given package was installed.
"""
return not self.get_packages_to_install([resolved_item])
[docs]
def get_version_strings(self):
"""
Return a list of version information strings.
Where each string is of the form "<installer> <version string>".
For example, ["apt-get x.y.z"] or ["pip x.y.z", "setuptools x.y.z"].
"""
raise NotImplementedError('subclasses must implement get_version_strings method')
[docs]
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
raise NotImplementedError('subclasses must implement', resolved, interactive, reinstall, quiet)
[docs]
def get_depends(self, rosdep_args):
"""
:returns: list of dependencies on other rosdep keys. Only
necessary if the package manager doesn't handle
dependencies.
"""
if self.supports_depends and type(rosdep_args) is dict:
return rosdep_args.get('depends', [])
return [] # Default return empty list
def normalize_uninstalled_to_list(uninstalled):
uninstalled_dependencies = []
for pkg_or_list in [v for k, v in uninstalled]:
if isinstance(pkg_or_list, list):
for pkg in pkg_or_list:
uninstalled_dependencies.append(str(pkg))
else:
uninstalled_dependencies.append(str(pkg))
return uninstalled_dependencies
class RosdepInstaller(object):
def __init__(self, installer_context, lookup):
self.installer_context = installer_context
self.lookup = lookup
def get_uninstalled(self, resources, implicit=False, verbose=False):
"""
Get list of system dependencies that have not been installed
as well as a list of errors from performing the resolution.
This is a bulk API in order to provide performance
optimizations in checking install state.
:param resources: List of resource names (e.g. ROS package names), ``[str]]``
:param implicit: Install implicit (recursive) dependencies of
resources. Default ``False``.
:returns: (uninstalled, errors), ``({str: [opaque]}, {str: ResolutionError})``.
Uninstalled is a dictionary with the installer_key as the key.
:raises: :exc:`RosdepInternalError`
"""
installer_context = self.installer_context
# resolutions have been unique()d
if verbose:
print('resolving for resources [%s]' % (', '.join(resources)))
resolutions, errors = self.lookup.resolve_all(resources, installer_context, implicit=implicit)
# for each installer, figure out what is left to install
uninstalled = []
if resolutions == []:
return uninstalled, errors
for installer_key, resolved in resolutions: # py3k
if verbose:
print('resolution: %s [%s]' % (installer_key, ', '.join([str(r) for r in resolved])))
try:
installer = installer_context.get_installer(installer_key)
except KeyError as e: # lookup has to be buggy to cause this
raise RosdepInternalError(e)
try:
packages_to_install = installer.get_packages_to_install(resolved)
except Exception as e:
rd_debug(traceback.format_exc())
raise RosdepInternalError(e, message='Bad installer [%s]: %s' % (installer_key, e))
# only create key if there is something to do
if packages_to_install:
uninstalled.append((installer_key, packages_to_install))
if verbose:
print('uninstalled: [%s]' % (', '.join([str(p) for p in packages_to_install])))
return uninstalled, errors
def install(self, uninstalled, interactive=True, simulate=False,
continue_on_error=False, reinstall=False, verbose=False, quiet=False):
"""
Install the uninstalled rosdeps. This API is for the bulk
workflow of rosdep (see example below). For a more targeted
install API, see :meth:`RosdepInstaller.install_resolved`.
:param uninstalled: uninstalled value from
:meth:`RosdepInstaller.get_uninstalled`. Value is a
dictionary mapping installer key to a dictionary with resolution
data, ``{str: {str: vals}}``
:param interactive: If ``False``, suppress
interactive prompts (e.g. by passing '-y' to ``apt``).
:param simulate: If ``False`` simulate installation
without actually executing.
:param continue_on_error: If ``True``, continue installation
even if an install fails. Otherwise, stop after first
installation failure.
:param reinstall: If ``True``, install dependencies if even
already installed (default ``False``).
:raises: :exc:`InstallFailed` if any rosdeps fail to install
and *continue_on_error* is ``False``.
:raises: :exc:`KeyError` If *uninstalled* value has invalid
installer keys
Example::
uninstalled, errors = installer.get_uninstalled(packages)
installer.install(uninstalled)
"""
if verbose:
print(
'install options: reinstall[%s] simulate[%s] interactive[%s]' %
(reinstall, simulate, interactive)
)
uninstalled_list = normalize_uninstalled_to_list(uninstalled)
print('install: uninstalled keys are %s' % ', '.join(uninstalled_list))
# Squash uninstalled again, in case some dependencies were already installed
squashed_uninstalled = []
previous_installer_key = None
for installer_key, resolved in uninstalled:
if previous_installer_key != installer_key:
squashed_uninstalled.append((installer_key, []))
previous_installer_key = installer_key
squashed_uninstalled[-1][1].extend(resolved)
failures = []
for installer_key, resolved in squashed_uninstalled:
try:
self.install_resolved(installer_key, resolved, simulate=simulate,
interactive=interactive, reinstall=reinstall, continue_on_error=continue_on_error,
verbose=verbose, quiet=quiet)
except InstallFailed as e:
if not continue_on_error:
raise
else:
# accumulate errors
failures.extend(e.failures)
if failures:
raise InstallFailed(failures=failures)
def install_resolved(self, installer_key, resolved, simulate=False, interactive=True,
reinstall=False, continue_on_error=False, verbose=False, quiet=False):
"""
Lower-level API for installing a rosdep dependency. The
rosdep keys have already been resolved to *installer_key* and
*resolved* via :exc:`RosdepLookup` or other means.
:param installer_key: Key for installer to apply to *resolved*, ``str``
:param resolved: Opaque resolution list from :class:`RosdepLookup`.
:param interactive: If ``True``, allow interactive prompts (default ``True``)
:param simulate: If ``True``, don't execute installation commands, just print to screen.
:param reinstall: If ``True``, install dependencies if even
already installed (default ``False``).
:param verbose: If ``True``, print verbose output to screen (default ``False``)
:param quiet: If ``True``, supress output except for errors (default ``False``)
:raises: :exc:`InstallFailed` if any of *resolved* fail to install.
"""
installer_context = self.installer_context
installer = installer_context.get_installer(installer_key)
command = installer.get_install_command(resolved, interactive=interactive, reinstall=reinstall, quiet=quiet)
if not command:
if verbose:
print('#No packages to install')
return
if simulate:
print('#[%s] Installation commands:' % (installer_key))
for sub_command in command:
if isinstance(sub_command[0], list):
sub_cmd_len = len(sub_command)
for i, cmd in enumerate(sub_command):
print(" '%s' (alternative %d/%d)" % (' '.join(cmd), i + 1, sub_cmd_len))
else:
print(' ' + ' '.join(sub_command))
# nothing left to do for simulation
if simulate:
return
def run_command(command, installer_key, failures, verbose):
# always echo commands to screen
print_bold('executing command [%s]' % ' '.join(command))
result = subprocess.call(command)
if verbose:
print('command return code [%s]: %s' % (' '.join(command), result))
if result != 0:
failures.append((installer_key, 'command [%s] failed' % (' '.join(command))))
return result
# run each install command set and collect errors
failures = []
for sub_command in command:
if isinstance(sub_command[0], list): # list of alternatives
alt_failures = []
for alt_command in sub_command:
result = run_command(alt_command, installer_key, alt_failures, verbose)
if result == 0: # one successsfull command is sufficient
alt_failures = [] # clear failuers from other alternatives
break
failures.extend(alt_failures)
else:
result = run_command(sub_command, installer_key, failures, verbose)
if result != 0:
if not continue_on_error:
raise InstallFailed(failures=failures)
# test installation of each
for r in resolved:
if not installer.is_installed(r):
failures.append((installer_key, 'Failed to detect successful installation of [%s]' % (r)))
# finalize result
if failures:
raise InstallFailed(failures=failures)
elif verbose:
print('#successfully installed')