404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.191.160.52: ~ $
import abc
import copy
import logging
import re
from os.path import exists
from typing import Any, Dict, List, Optional, Tuple, Union

from uaclient import (
    apt,
    contract,
    event_logger,
    exceptions,
    http,
    messages,
    system,
    util,
)
from uaclient.entitlements import base
from uaclient.entitlements.entitlement_status import (
    ApplicationStatus,
    CanDisableFailure,
    CanDisableFailureReason,
)

event = event_logger.get_event_logger()
LOG = logging.getLogger(util.replace_top_level_logger_name(__name__))

# See 'What does a specific Ubuntu kernel version number mean?' in
# https://wiki.ubuntu.com/Kernel/FAQ
RE_KERNEL_PKG = r"^linux-image-([\d]+[.-][\d]+[.-][\d]+-[\d]+-[A-Za-z0-9_-]+)$"


class RepoEntitlement(base.UAEntitlement):
    repo_list_file_tmpl = "/etc/apt/sources.list.d/ubuntu-{name}.list"
    repo_pref_file_tmpl = "/etc/apt/preferences.d/ubuntu-{name}"
    repo_url_tmpl = "{}/ubuntu"

    # The repo Origin value defined in apt metadata
    origin = None  # type: Optional[str]

    # GH: #1084 call apt in noninteractive mode
    apt_noninteractive = False

    # Check if the requested packages are installed to inform if
    # the service is enabled or not
    check_packages_are_installed = False

    # RepoEntitlements can be purged, unless specifically stated
    supports_purge = True

    # Optional repo pin priority in subclass
    @property
    def repo_pin_priority(self) -> Union[int, str, None]:
        return None

    @property
    def packages(self) -> List[str]:
        """debs to install on enablement"""
        packages = []

        entitlement = self.entitlement_cfg.get("entitlement", {})

        if entitlement:
            directives = entitlement.get("directives", {})
            additional_packages = copy.copy(
                directives.get("additionalPackages", [])
            )

            packages = additional_packages

        return packages

    def _check_for_reboot(self) -> bool:
        """Check if system needs to be rebooted."""
        reboot_required = system.should_reboot(
            installed_pkgs=set(self.packages)
        )
        event.needs_reboot(reboot_required)
        return reboot_required

    @property
    @abc.abstractmethod
    def repo_key_file(self) -> str:
        pass

    def can_disable(
        self, ignore_dependent_services: bool = False
    ) -> Tuple[bool, Optional[CanDisableFailure]]:
        result, reason = super().can_disable(
            ignore_dependent_services=ignore_dependent_services
        )
        if result is False:
            return result, reason

        if not self.origin and self.purge:
            return (
                False,
                CanDisableFailure(
                    CanDisableFailureReason.NO_PURGE_WITHOUT_ORIGIN,
                    messages.REPO_PURGE_FAIL_NO_ORIGIN.format(
                        entitlement_name=self.title, title=self.title
                    ),
                ),
            )

        return result, reason

    def _perform_enable(self, silent: bool = False) -> bool:
        """Enable specific entitlement.

        @return: True on success, False otherwise.
        @raises: UbuntuProError on failure to install suggested packages
        """
        self.setup_apt_config(silent=silent)

        if self.supports_access_only and self.access_only:
            if len(self.packages) > 0:
                event.info(
                    messages.SKIPPING_INSTALLING_PACKAGES.format(
                        packages=" ".join(self.packages)
                    )
                )
            event.info(messages.ACCESS_ENABLED_TMPL.format(title=self.title))
        else:
            self.install_packages()
            event.info(messages.ENABLED_TMPL.format(title=self.title))
            self._check_for_reboot_msg(operation="install")
        return True

    def _perform_disable(self, silent=False):
        if self.purge and self.origin:
            print(messages.PURGE_EXPERIMENTAL)
            print()

            repo_origin_packages = apt.get_installed_packages_by_origin(
                self.origin
            )

            if not self.purge_kernel_check(repo_origin_packages):
                return False

            packages_to_reinstall = []
            packages_to_remove = []
            for package in repo_origin_packages:
                alternatives = apt.get_remote_versions_for_package(
                    package, exclude_origin=self.origin
                )
                if alternatives:
                    # We can call max(List[Version]) but mypy doesn't know.
                    # Or doesn't like it.
                    packages_to_reinstall.append(
                        (package, max(alternatives))  # type: ignore
                    )
                else:
                    packages_to_remove.append(package)

            if not self.prompt_for_purge(
                packages_to_remove, packages_to_reinstall
            ):
                return False

        if hasattr(self, "remove_packages"):
            self.remove_packages()
        self.remove_apt_config(silent=silent)

        if self.purge and self.origin:
            self.execute_reinstall(packages_to_reinstall)
            self.execute_removal(packages_to_remove)
        return True

    def purge_kernel_check(self, package_list):
        """
        Checks if the purge operation involves a kernel.

        When package called 'linux-image-*' is in the package list, warn the
        user that a kernel is being removed. Then, show the user what the
        current kernel is.

        If the current kernel is to be removed, and there are no other valid
        Ubuntu Kernels installed in the system, return False to abort the
        operation.

        If there is another Ubuntu kernel - besides the one installed - then
        prompt the user for confirmation before proceeding.
        """
        linux_image_versions = []
        for package in package_list:
            m = re.search(RE_KERNEL_PKG, package.name)
            if m:
                linux_image_versions.append(m.group(1))
        if linux_image_versions:
            print(messages.PURGE_KERNEL_REMOVAL.format(service=self.title))
            print(" ".join(linux_image_versions))

            current_kernel = system.get_kernel_info().uname_release
            print(
                messages.PURGE_CURRENT_KERNEL.format(
                    kernel_version=current_kernel
                )
            )

            installed_kernels = system.get_installed_ubuntu_kernels()
            # Any installed Ubuntu Kernel not being touched in this operation
            alternative_kernels = [
                version
                for version in installed_kernels
                if version not in linux_image_versions
            ]

            if not alternative_kernels:
                print(messages.PURGE_NO_ALTERNATIVE_KERNEL)
                return False

            if not util.prompt_for_confirmation(
                messages.PURGE_KERNEL_CONFIRMATION
            ):
                return False

        return True

    def prompt_for_purge(self, packages_to_remove, packages_to_reinstall):
        prompt = False
        if packages_to_remove:
            print(messages.WARN_PACKAGES_REMOVAL)
            util.print_package_list(
                [package.name for package in packages_to_remove]
            )
            prompt = True

        if packages_to_reinstall:
            print(messages.WARN_PACKAGES_REINSTALL)
            util.print_package_list(
                [package.name for (package, _) in packages_to_reinstall]
            )
            prompt = True

        if prompt:
            return util.prompt_for_confirmation(messages.PROCEED_YES_NO)
        return True

    def execute_removal(self, packages_to_remove):
        # We need to check again if the package is installed, because there is
        # an intermediate step between listing the packages and acting on them.
        # Some reinstalls may also uninstall dependencies.
        # Packages may be removed between those operations.
        installed_packages = apt.get_installed_packages_names()
        to_remove = [
            package.name
            for package in packages_to_remove
            if package.name in installed_packages
        ]
        if to_remove:
            apt.purge_packages(
                to_remove,
                messages.UNINSTALLING_PACKAGES_FAILED.format(
                    packages=to_remove
                ),
            )

    def execute_reinstall(self, packages_to_reinstall):
        # We need to check again if the package is installed, because there is
        # an intermediate step between listing the packages and acting on them.
        # Packages may be removed between those operations.
        installed_packages = apt.get_installed_packages_names()
        to_reinstall = [
            "{}={}".format(package.name, version.ver_str)
            for (package, version) in packages_to_reinstall
            if package.name in installed_packages
        ]
        if to_reinstall:
            apt.reinstall_packages(to_reinstall)

    def application_status(
        self,
    ) -> Tuple[ApplicationStatus, Optional[messages.NamedMessage]]:
        current_status = (
            ApplicationStatus.DISABLED,
            messages.SERVICE_NOT_CONFIGURED.format(title=self.title),
        )

        entitlement_cfg = self.entitlement_cfg
        directives = entitlement_cfg.get("entitlement", {}).get(
            "directives", {}
        )
        repo_url = directives.get("aptURL")
        if not repo_url:
            return (
                ApplicationStatus.DISABLED,
                messages.NO_APT_URL_FOR_SERVICE.format(title=self.title),
            )
        policy = apt.get_apt_cache_policy(error_msg=messages.APT_POLICY_FAILED)
        match = re.search(self.repo_url_tmpl.format(repo_url), policy)
        if match:
            current_status = (
                ApplicationStatus.ENABLED,
                messages.SERVICE_IS_ACTIVE.format(title=self.title),
            )

        if self.check_packages_are_installed:
            for package in self.packages:
                if not apt.is_installed(package):
                    return (
                        ApplicationStatus.DISABLED,
                        messages.SERVICE_DISABLED_MISSING_PACKAGE.format(
                            service=self.name, package=package
                        ),
                    )

        return current_status

    def _check_apt_url_is_applied(self, apt_url):
        """Check if apt url delta should be applied.

        :param apt_url: string containing the apt url to be used.

        :return: False if apt url is already found on the source file.
                 True otherwise.
        """
        apt_file = self.repo_list_file_tmpl.format(name=self.name)
        # If the apt file is commented out, we will assume that we need
        # to regenerate the apt file, regardless of the apt url delta
        if all(
            line.startswith("#")
            for line in system.load_file(apt_file).strip().split("\n")
        ):
            return False

        # If the file is not commented out and we don't have delta,
        # we will not do anything
        if not apt_url:
            return True

        # If the delta is already in the file, we won't reconfigure it
        # again
        return bool(apt_url in system.load_file(apt_file))

    def process_contract_deltas(
        self,
        orig_access: Dict[str, Any],
        deltas: Dict[str, Any],
        allow_enable: bool = False,
    ) -> bool:
        """Process any contract access deltas for this entitlement.

        :param orig_access: Dictionary containing the original
            resourceEntitlement access details.
        :param deltas: Dictionary which contains only the changed access keys
        and values.
        :param allow_enable: Boolean set True if allowed to perform the enable
            operation. When False, a message will be logged to inform the user
            about the recommended enabled service.

        :return: True when delta operations are processed; False when noop.
        """
        if super().process_contract_deltas(orig_access, deltas, allow_enable):
            return True  # Already processed parent class deltas

        delta_entitlement = deltas.get("entitlement", {})
        delta_directives = delta_entitlement.get("directives", {})
        delta_apt_url = delta_directives.get("aptURL")
        delta_packages = delta_directives.get("additionalPackages")
        status_cache = self.cfg.read_cache("status-cache")

        if delta_directives and status_cache:
            application_status = self._check_application_status_on_cache()
        else:
            application_status, _ = self.application_status()

        if application_status == ApplicationStatus.DISABLED:
            return False

        if not self._check_apt_url_is_applied(delta_apt_url):
            LOG.info(
                "New aptURL, updating %s apt sources list to %s",
                self.name,
                delta_apt_url,
            )
            event.info(
                messages.REPO_UPDATING_APT_SOURCES.format(service=self.name)
            )

            orig_entitlement = orig_access.get("entitlement", {})
            old_url = orig_entitlement.get("directives", {}).get("aptURL")
            if old_url:
                # Remove original aptURL and auth and rewrite
                repo_filename = self.repo_list_file_tmpl.format(name=self.name)
                apt.remove_auth_apt_repo(repo_filename, old_url)

            self.remove_apt_config()
            self.setup_apt_config()

        if delta_packages:
            LOG.info("New additionalPackages, installing %r", delta_packages)
            event.info(
                messages.REPO_REFRESH_INSTALLING_PACKAGES.format(
                    packages=", ".join(delta_packages)
                )
            )
            self.install_packages(package_list=delta_packages)

        return True

    def install_packages(
        self,
        package_list: Optional[List[str]] = None,
        cleanup_on_failure: bool = True,
        verbose: bool = True,
    ) -> None:
        """Install contract recommended packages for the entitlement.

        :param package_list: Optional package list to use instead of
            self.packages.
        :param cleanup_on_failure: Cleanup apt files if apt install fails.
        :param verbose: If true, print messages to stdout
        """

        if not package_list:
            package_list = self.packages

        if not package_list:
            return

        msg_ops = self.messaging.get("pre_install", [])
        if not util.handle_message_operations(msg_ops):
            return

        try:
            self._update_sources_list()
        except exceptions.UbuntuProError:
            if cleanup_on_failure:
                self.remove_apt_config()
            raise

        if verbose:
            event.info(
                messages.INSTALLING_SERVICE_PACKAGES.format(title=self.title)
            )

        if self.apt_noninteractive:
            override_env_vars = {"DEBIAN_FRONTEND": "noninteractive"}
            apt_options = [
                "--allow-downgrades",
                '-o Dpkg::Options::="--force-confdef"',
                '-o Dpkg::Options::="--force-confold"',
            ]
        else:
            override_env_vars = None
            apt_options = []

        try:
            apt.run_apt_install_command(
                packages=package_list,
                apt_options=apt_options,
                override_env_vars=override_env_vars,
            )
        except exceptions.UbuntuProError:
            event.info(messages.ENABLE_FAILED.format(title=self.title))
            if cleanup_on_failure:
                self.remove_apt_config()
            raise

    def setup_apt_config(self, silent: bool = False) -> None:
        """Setup apt config based on the resourceToken and directives.
        Also sets up apt proxy if necessary.

        :raise UbuntuProError: on failure to setup any aspect of this apt
           configuration
        """
        http_proxy = None  # type: Optional[str]
        https_proxy = None  # type: Optional[str]
        scope = None  # type: Optional[apt.AptProxyScope]
        if self.cfg.global_apt_http_proxy or self.cfg.global_apt_https_proxy:
            http_proxy = http.validate_proxy(
                "http",
                self.cfg.global_apt_http_proxy,
                http.PROXY_VALIDATION_APT_HTTP_URL,
            )
            https_proxy = http.validate_proxy(
                "https",
                self.cfg.global_apt_https_proxy,
                http.PROXY_VALIDATION_APT_HTTPS_URL,
            )
            scope = apt.AptProxyScope.GLOBAL
        elif self.cfg.ua_apt_http_proxy or self.cfg.ua_apt_https_proxy:
            http_proxy = http.validate_proxy(
                "http",
                self.cfg.ua_apt_http_proxy,
                http.PROXY_VALIDATION_APT_HTTP_URL,
            )
            https_proxy = http.validate_proxy(
                "https",
                self.cfg.ua_apt_https_proxy,
                http.PROXY_VALIDATION_APT_HTTPS_URL,
            )
            scope = apt.AptProxyScope.UACLIENT

        apt.setup_apt_proxy(
            http_proxy=http_proxy, https_proxy=https_proxy, proxy_scope=scope
        )
        repo_filename = self.repo_list_file_tmpl.format(name=self.name)
        resource_cfg = self.entitlement_cfg
        directives = resource_cfg["entitlement"].get("directives", {})
        obligations = resource_cfg["entitlement"].get("obligations", {})
        token = resource_cfg.get("resourceToken")
        if not token:
            machine_token = self.cfg.machine_token["machineToken"]
            if not obligations.get("enableByDefault"):
                # services that are not enableByDefault need to obtain specific
                # resource access for tokens. We want to refresh this every
                # enable call because it is not refreshed by `pro refresh`.
                client = contract.UAContractClient(self.cfg)
                machine_access = client.get_resource_machine_access(
                    machine_token, self.name
                )
                if machine_access:
                    token = machine_access.get("resourceToken")
            if not token:
                token = machine_token
                LOG.warning(
                    "No resourceToken present in contract for service %s."
                    " Using machine token as credentials",
                    self.title,
                )
        aptKey = directives.get("aptKey")
        if not aptKey:
            raise exceptions.RepoNoAptKey(entitlement_name=self.name)
        repo_url = directives.get("aptURL")
        if not repo_url:
            raise exceptions.MissingAptURLDirective(entitlement_name=self.name)
        repo_suites = directives.get("suites")
        if not repo_suites:
            raise exceptions.RepoNoSuites(entitlement_name=self.name)
        if self.repo_pin_priority:
            if not self.origin:
                raise exceptions.RepoPinFailNoOrigin(
                    entitlement_name=self.name,
                    title=self.title,
                )
            repo_pref_file = self.repo_pref_file_tmpl.format(name=self.name)
            apt.add_ppa_pinning(
                repo_pref_file,
                repo_url,
                self.origin,
                self.repo_pin_priority,
            )

        prerequisite_pkgs = []
        if not exists(apt.APT_METHOD_HTTPS_FILE):
            prerequisite_pkgs.append("apt-transport-https")
        if not exists(apt.CA_CERTIFICATES_FILE):
            prerequisite_pkgs.append("ca-certificates")

        if prerequisite_pkgs:
            if not silent:
                event.info(
                    messages.INSTALLING_PACKAGES.format(
                        packages=", ".join(prerequisite_pkgs)
                    )
                )
            try:
                apt.run_apt_install_command(packages=prerequisite_pkgs)
            except exceptions.UbuntuProError:
                self.remove_apt_config()
                raise
        apt.add_auth_apt_repo(
            repo_filename,
            self.repo_url_tmpl.format(repo_url),
            token,
            repo_suites,
            self.repo_key_file,
        )
        # Run apt-update on any repo-entitlement enable because the machine
        # probably wants access to the repo that was just enabled.
        # Side-effect is that apt policy will now report the repo as accessible
        # which allows pro status to report correct info
        if not silent:
            event.info(messages.APT_UPDATING_LIST.format(name=self.title))
        try:
            apt.update_sources_list(repo_filename)
        except exceptions.UbuntuProError:
            self.remove_apt_config(run_apt_update=False)
            raise

    def remove_apt_config(
        self, run_apt_update: bool = True, silent: bool = False
    ):
        """Remove any repository apt configuration files.

        :param run_apt_update: If after removing the apt update
            command after removing the apt files.
        """
        series = system.get_release_info().series
        repo_filename = self.repo_list_file_tmpl.format(name=self.name)
        entitlement = self.cfg.machine_token_file.entitlements[self.name].get(
            "entitlement", {}
        )
        access_directives = entitlement.get("directives", {})
        repo_url = access_directives.get("aptURL")
        if not repo_url:
            raise exceptions.MissingAptURLDirective(entitlement_name=self.name)

        apt.remove_auth_apt_repo(repo_filename, repo_url, self.repo_key_file)
        apt.remove_apt_list_files(repo_url, series)

        if self.repo_pin_priority:
            repo_pref_file = self.repo_pref_file_tmpl.format(name=self.name)
            system.ensure_file_absent(repo_pref_file)

        if run_apt_update:
            if not silent:
                event.info(messages.APT_UPDATING_LISTS)
            apt.run_apt_update_command()

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 6.76 KB 0644
anbox.py File 3.04 KB 0644
base.py File 44.05 KB 0644
cc.py File 1013 B 0644
cis.py File 1.14 KB 0644
entitlement_status.py File 3.12 KB 0644
esm.py File 3.85 KB 0644
fips.py File 19.99 KB 0644
landscape.py File 4.84 KB 0644
livepatch.py File 12.63 KB 0644
realtime.py File 5.46 KB 0644
repo.py File 21.85 KB 0644
ros.py File 1.47 KB 0644