404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.21.186.117: ~ $
"""
This module is responsible for handling all events
that must be raised to the user somehow. The main idea
behind this module is to centralize all events that happens
during the execution of Pro commands and allows us to report
those events in real time or through a machine-readable format.
"""

import enum
import json
import sys
from typing import Any, Dict, List, Optional, Set, Union  # noqa: F401

from uaclient.yaml import safe_dump

JSON_SCHEMA_VERSION = "0.1"
EventFieldErrorType = Optional[Union[str, Dict[str, str]]]
_event_logger = None


def get_event_logger():
    global _event_logger

    if _event_logger is None:
        _event_logger = EventLogger()

    return _event_logger


@enum.unique
class EventLoggerMode(enum.Enum):
    """
    Defines event logger supported modes.
    Currently, we only support the cli and machine-readable mode. On cli mode,
    we will print to stdout/stderr any event that we receive. Otherwise, we
    will store those events and parse them for the specified format.
    """

    CLI = object()
    JSON = object()
    YAML = object()


def format_machine_readable_output(status: Dict[str, Any]) -> Dict[str, Any]:
    from uaclient.util import get_pro_environment

    status["environment_vars"] = [
        {"name": name, "value": value}
        for name, value in sorted(get_pro_environment().items())
    ]

    # We don't need the origin info in the json output
    status.pop("origin", "")

    # In case there is an error during status and the services were
    # not processed
    status.setdefault("services", [])

    # We are redacting every variant information from the status output
    # because we still are not sure on the best way to represent this
    # information on the status machine readable output
    for service in status.get("services", []):
        if "variants" in service:
            service.pop("variants")

    return status


class EventLogger:
    def __init__(self):
        self._error_events = []  # type: List[Dict[str, EventFieldErrorType]]
        self._warning_events = []  # type: List[Dict[str, EventFieldErrorType]]
        self._processed_services = set()  # type: Set[str]
        self._failed_services = set()  # type: Set[str]
        self._needs_reboot = False
        self._command = ""
        self._output_content = {}

        # By default, the event logger will be on CLI mode,
        # printing every event it receives.
        self._event_logger_mode = EventLoggerMode.CLI

    def reset(self):
        """Reset the state of the event logger attributes."""
        self._error_events = []
        self._warning_events = []
        self._processed_services = set()
        self._failed_services = set()
        self._needs_reboot = False
        self._command = ""
        self._output_content = {}
        self._event_logger_mode = EventLoggerMode.CLI

    def set_event_mode(self, event_mode: EventLoggerMode):
        """Set the event logger mode.

        We currently support the CLI, JSON and YAML modes.
        """
        self._event_logger_mode = event_mode

    def set_command(self, command: str):
        """Set the event logger command.

        The command will tell the process_events method which output method
        to use.
        """
        self._command = command

    def set_output_content(self, output_content: Dict):
        """Set the event logger output content.

        The command will tell the process_events method which content
        to use.
        """
        self._output_content = output_content

    def info(self, info_msg: str, file_type=None, end: Optional[str] = None):
        """
        Print the info message if the event logger is on CLI mode.
        """
        if not file_type:
            file_type = sys.stdout

        if self._event_logger_mode == EventLoggerMode.CLI:
            print(info_msg, file=file_type, end=end)

    def _record_dict_event(
        self,
        msg: str,
        service: Optional[str],
        event_dict: List[Dict[str, EventFieldErrorType]],
        code: Optional[str] = None,
        event_type: Optional[str] = None,
        additional_info: Optional[Dict[str, str]] = None,
    ):
        if event_type is None:
            event_type = "service" if service else "system"

        event_entry = {
            "type": event_type,
            "service": service,
            "message": msg,
            "message_code": code,
        }  # type: Dict[str, EventFieldErrorType]

        if additional_info:
            event_entry["additional_info"] = additional_info

        event_dict.append(event_entry)

    def error(
        self,
        error_msg: str,
        error_code: Optional[str] = None,
        service: Optional[str] = None,
        error_type: Optional[str] = None,
        additional_info: Optional[Dict[str, str]] = None,
    ):
        """
        Store an error in the event logger.

        However, the error will only be stored if the event logger
        is not on CLI mode.
        """
        if self._event_logger_mode != EventLoggerMode.CLI:
            self._record_dict_event(
                msg=error_msg,
                service=service,
                event_dict=self._error_events,
                code=error_code,
                event_type=error_type,
                additional_info=additional_info,
            )

    def warning(self, warning_msg: str, service: Optional[str] = None):
        """
        Store a warning in the event logger.

        However, the warning will only be stored if the event logger
        is not on CLI mode.
        """
        if self._event_logger_mode != EventLoggerMode.CLI:
            self._record_dict_event(
                msg=warning_msg,
                service=service,
                event_dict=self._warning_events,
            )

    def service_processed(self, service: str):
        self._processed_services.add(service)

    def services_failed(self, services: List[str]):
        self._failed_services.update(services)

    def service_failed(self, service: str):
        self._failed_services.add(service)

    def needs_reboot(self, reboot_required: bool):
        self._needs_reboot = reboot_required

    def _generate_failed_services(self):
        services_with_error = {
            error["service"]
            for error in self._error_events
            if error["service"]
        }
        return list(set.union(self._failed_services, services_with_error))

    def _process_events_services(self):
        response = {
            "_schema_version": JSON_SCHEMA_VERSION,
            "result": "success" if not self._error_events else "failure",
            "processed_services": sorted(self._processed_services),
            "failed_services": sorted(self._generate_failed_services()),
            "errors": self._error_events,
            "warnings": self._warning_events,
            "needs_reboot": self._needs_reboot,
        }

        from uaclient.util import DatetimeAwareJSONEncoder

        print(
            json.dumps(response, cls=DatetimeAwareJSONEncoder, sort_keys=True)
        )

    def _process_events_status(self):
        output = format_machine_readable_output(self._output_content)
        output["result"] = "success" if not self._error_events else "failure"
        output["errors"] = self._error_events
        output["warnings"] = self._warning_events

        if self._event_logger_mode == EventLoggerMode.JSON:
            from uaclient.util import DatetimeAwareJSONEncoder

            print(
                json.dumps(
                    output, cls=DatetimeAwareJSONEncoder, sort_keys=True
                )
            )
        elif self._event_logger_mode == EventLoggerMode.YAML:
            print(safe_dump(output, default_flow_style=False))

    def process_events(self) -> None:
        """
        Creates a json response based on all of the
        events stored in the event logger.

        The json response will only be created if the event logger
        is not on CLI mode.
        """
        if self._event_logger_mode != EventLoggerMode.CLI:
            if self._command == "status":
                self._process_events_status()
            else:
                self._process_events_services()

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
api Folder 0755
cli Folder 0755
clouds Folder 0755
daemon Folder 0755
entitlements Folder 0755
files Folder 0755
http Folder 0755
messages Folder 0755
timer Folder 0755
__init__.py File 0 B 0644
actions.py File 8.79 KB 0644
apt.py File 32.73 KB 0644
apt_news.py File 6.54 KB 0644
config.py File 23.1 KB 0644
contract.py File 30.3 KB 0644
contract_data_types.py File 9.89 KB 0644
data_types.py File 10.27 KB 0644
defaults.py File 2.1 KB 0644
event_logger.py File 8.06 KB 0644
exceptions.py File 14.62 KB 0644
gpg.py File 836 B 0644
livepatch.py File 12.51 KB 0644
lock.py File 3.56 KB 0644
log.py File 2.91 KB 0644
security.py File 56.78 KB 0644
security_status.py File 25.25 KB 0644
snap.py File 6.84 KB 0644
status.py File 29.69 KB 0644
system.py File 24.65 KB 0644
types.py File 308 B 0644
upgrade_lts_contract.py File 3.5 KB 0644
util.py File 15.31 KB 0644
version.py File 2.63 KB 0644
yaml.py File 840 B 0644