from typing import List, Optional
from uaclient import apt, messages, util
from uaclient.api.u.pro.security.fix import (
FixPlanAptUpgradeStep,
FixPlanAttachStep,
FixPlanEnableStep,
FixPlanNoOpStatus,
FixPlanNoOpStep,
FixPlanResult,
FixPlanWarningPackageCannotBeInstalled,
FixPlanWarningSecurityIssueNotFixed,
)
from uaclient.api.u.pro.security.fix._common import status_message
from uaclient.data_types import DataObject, Field, StringDataValue, data_list
from uaclient.security import FixStatus
class UpgradedPackage(DataObject):
fields = [
Field("name", StringDataValue),
Field("version", StringDataValue),
Field("pocket", StringDataValue),
]
def __init__(self, name: str, version: str, pocket: str):
self.name = name
self.version = version
self.pocket = pocket
class FailedUpgrade(DataObject):
fields = [
Field("name", StringDataValue),
Field("pocket", StringDataValue, required=False),
]
def __init__(self, name: str, pocket: Optional[str] = None):
self.name = name
self.pocket = pocket
class FixExecuteError(DataObject):
fields = [
Field("error_type", StringDataValue),
Field("reason", StringDataValue),
Field("failed_upgrades", data_list(FailedUpgrade), required=False),
]
def __init__(
self,
error_type: str,
reason: str,
failed_upgrades: Optional[List[FailedUpgrade]] = None,
):
self.error_type = error_type
self.reason = reason
self.failed_upgrades = failed_upgrades
class FixExecuteResult(DataObject):
fields = [
Field("title", StringDataValue),
Field("description", StringDataValue, required=False),
Field("status", StringDataValue),
Field("upgraded_packages", data_list(UpgradedPackage), required=False),
Field("errors", data_list(FixExecuteError), required=False),
]
def __init__(
self,
title: str,
status: str,
description: Optional[str] = None,
upgraded_packages: Optional[List[UpgradedPackage]] = None,
errors: Optional[List[FixExecuteError]] = None,
):
self.title = title
self.description = description
self.status = status
self.upgraded_packages = upgraded_packages
self.errors = errors
class ExecuteContext:
def __init__(self):
self.require_enable = False
self.require_attach = False
self.status = FixStatus.SYSTEM_NON_VULNERABLE.value.msg
self.upgraded_pkgs = [] # type: List[UpgradedPackage]
self.errors = [] # type: List[FixExecuteError]
def _handle_error(
execute_context: ExecuteContext, security_issue: FixPlanResult
):
if security_issue.error:
execute_context.errors.append(
FixExecuteError(
error_type=security_issue.error.code or "unexpected-error",
reason=security_issue.error.msg,
)
)
execute_context.status = "error"
def _handle_security_issue_not_fixed(
execute_context: ExecuteContext,
warning: FixPlanWarningSecurityIssueNotFixed,
):
execute_context.errors.append(
FixExecuteError(
error_type=warning.warning_type,
reason=status_message(warning.data.status),
failed_upgrades=[
FailedUpgrade(name=pkg) for pkg in warning.data.source_packages
],
)
)
execute_context.status = FixStatus.SYSTEM_STILL_VULNERABLE.value.msg
def _handle_package_cannot_be_installed(
execute_context: ExecuteContext,
warning: FixPlanWarningPackageCannotBeInstalled,
):
execute_context.errors.append(
FixExecuteError(
error_type=warning.warning_type,
reason=messages.FIX_CANNOT_INSTALL_PACKAGE.format(
package=warning.data.binary_package,
version=warning.data.binary_package_version,
),
failed_upgrades=[
FailedUpgrade(
name=warning.data.binary_package,
pocket=warning.data.pocket,
)
],
)
)
execute_context.status = FixStatus.SYSTEM_STILL_VULNERABLE.value.msg
def _handle_attach(execute_context: ExecuteContext, step: FixPlanAttachStep):
execute_context.errors.append(
FixExecuteError(
error_type="fix-requires-attach",
reason=messages.SECURITY_UPDATE_NOT_INSTALLED_SUBSCRIPTION,
failed_upgrades=[
FailedUpgrade(name=pkg, pocket=step.data.required_service)
for pkg in step.data.source_packages
],
)
)
execute_context.require_attach = True
execute_context.status = FixStatus.SYSTEM_STILL_VULNERABLE.value.msg
def _handle_enable(execute_context: ExecuteContext, step: FixPlanEnableStep):
if execute_context.require_attach:
return
execute_context.errors.append(
FixExecuteError(
error_type="fix-requires-enable",
reason=messages.SECURITY_SERVICE_DISABLED.format(
service=step.data.service
),
failed_upgrades=[
FailedUpgrade(name=pkg, pocket=step.data.service)
for pkg in step.data.source_packages
],
)
)
execute_context.require_enable = True
execute_context.status = FixStatus.SYSTEM_STILL_VULNERABLE.value.msg
def _handle_apt_upgrade(
execute_context: ExecuteContext, step: FixPlanAptUpgradeStep
):
if execute_context.require_attach or execute_context.require_enable:
return
if not step.data.binary_packages:
return
if not util.we_are_currently_root():
execute_context.errors.append(
FixExecuteError(
error_type="fix-require-root",
reason=messages.SECURITY_APT_NON_ROOT,
failed_upgrades=[
FailedUpgrade(name=pkg, pocket=step.data.pocket)
for pkg in step.data.source_packages
],
)
)
execute_context.status = "error"
return
try:
apt.run_apt_update_command()
apt.run_apt_command(
cmd=["apt-get", "install", "--only-upgrade", "-y"]
+ step.data.binary_packages,
override_env_vars={"DEBIAN_FRONTEND": "noninteractive"},
)
for pkg in step.data.binary_packages:
pkg_version = apt.get_pkg_version(pkg)
if pkg_version:
execute_context.upgraded_pkgs.append(
UpgradedPackage(
name=pkg,
version=pkg_version,
pocket=step.data.pocket,
)
)
except Exception as e:
msg = getattr(e, "msg", str(e))
execute_context.status = FixStatus.SYSTEM_STILL_VULNERABLE.value.msg
execute_context.errors.append(
FixExecuteError(
error_type="fix-error-installing-pkg",
reason=msg,
failed_upgrades=[
FailedUpgrade(name=pkg, pocket=step.data.pocket)
for pkg in step.data.source_packages
],
)
)
def _handle_noop(execute_context: ExecuteContext, step: FixPlanNoOpStep):
if step.data.status == FixPlanNoOpStatus.NOT_AFFECTED.value:
execute_context.status = FixStatus.SYSTEM_NOT_AFFECTED.value.msg
def _execute_fix(security_issue: FixPlanResult) -> FixExecuteResult:
execute_context = ExecuteContext()
if security_issue.error:
_handle_error(execute_context, security_issue)
if security_issue.warnings:
for warning in security_issue.warnings:
if isinstance(warning, FixPlanWarningSecurityIssueNotFixed):
_handle_security_issue_not_fixed(execute_context, warning)
elif isinstance(warning, FixPlanWarningPackageCannotBeInstalled):
_handle_package_cannot_be_installed(execute_context, warning)
if security_issue.plan:
for step in security_issue.plan:
if isinstance(step, FixPlanAttachStep):
_handle_attach(execute_context, step)
elif isinstance(step, FixPlanEnableStep):
_handle_enable(execute_context, step)
elif isinstance(step, FixPlanAptUpgradeStep):
_handle_apt_upgrade(execute_context, step)
elif isinstance(step, FixPlanNoOpStep):
_handle_noop(execute_context, step)
return FixExecuteResult(
title=security_issue.title,
description=security_issue.description,
status=execute_context.status,
upgraded_packages=execute_context.upgraded_pkgs,
errors=None if not execute_context.errors else execute_context.errors,
)