404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.21.186.117: ~ $
import datetime
import glob
import logging
import os
from typing import List, Optional  # noqa: F401

from uaclient import (
    clouds,
    config,
    contract,
    entitlements,
    exceptions,
    livepatch,
)
from uaclient import log as pro_log
from uaclient import status as ua_status
from uaclient import system, timer, util
from uaclient.clouds import AutoAttachCloudInstance  # noqa: F401
from uaclient.clouds import identity
from uaclient.defaults import (
    CLOUD_BUILD_INFO,
    DEFAULT_CONFIG_FILE,
    DEFAULT_LOG_PREFIX,
)
from uaclient.files.state_files import (
    AttachmentData,
    attachment_data_file,
    timer_jobs_state_file,
)

LOG = logging.getLogger(util.replace_top_level_logger_name(__name__))


UA_SERVICES = (
    "ua-timer.service",
    "ua-timer.timer",
    "ua-auto-attach.path",
    "ua-auto-attach.service",
    "ua-reboot-cmds.service",
    "ubuntu-advantage.service",
)

USER_LOG_COLLECTED_LIMIT = 10


def attach_with_token(
    cfg: config.UAConfig, token: str, allow_enable: bool
) -> None:
    """
    Common functionality to take a token and attach via contract backend
    :raise UrlError: On unexpected connectivity issues to contract
        server or inability to access identity doc from metadata service.
    :raise ContractAPIError: On unexpected errors when talking to the contract
        server.
    """
    from uaclient.timer.update_messaging import update_motd_messages

    contract_client = contract.UAContractClient(cfg)
    attached_at = datetime.datetime.now(tz=datetime.timezone.utc)

    try:
        new_machine_token = contract_client.add_contract_machine(
            contract_token=token, attachment_dt=attached_at
        )
    except exceptions.UrlError as e:
        LOG.exception(str(e))
        raise exceptions.ConnectivityError()

    cfg.machine_token_file.write(new_machine_token)

    system.get_machine_id.cache_clear()
    machine_id = new_machine_token.get("machineTokenInfo", {}).get(
        "machineId", system.get_machine_id(cfg)
    )
    cfg.write_cache("machine-id", machine_id)

    try:
        contract.process_entitlements_delta(
            cfg,
            {},
            # we load from the file here instead of using the response
            # so that we get any machine_token_overlay present during testing
            # TODO: decide if there is a better way to do this
            cfg.machine_token_file.entitlements,
            allow_enable,
        )
    except (exceptions.UrlError, exceptions.UbuntuProError) as exc:
        # Persist updated status in the event of partial attach
        attachment_data_file.write(AttachmentData(attached_at=attached_at))
        ua_status.status(cfg=cfg)
        update_motd_messages(cfg)
        contract_client.update_activity_token()
        raise exc

    current_iid = identity.get_instance_id()
    if current_iid:
        cfg.write_cache("instance-id", current_iid)

    attachment_data_file.write(AttachmentData(attached_at=attached_at))
    update_motd_messages(cfg)
    timer.start()


def auto_attach(
    cfg: config.UAConfig,
    cloud: clouds.AutoAttachCloudInstance,
    allow_enable=True,
) -> None:
    """
    :raise UrlError: On unexpected connectivity issues to contract
        server or inability to access identity doc from metadata service.
    :raise ContractAPIError: On unexpected errors when talking to the contract
        server.
    :raise NonAutoAttachImageError: If this cloud type does not have
        auto-attach support.
    """
    contract_client = contract.UAContractClient(cfg)
    tokenResponse = contract_client.get_contract_token_for_cloud_instance(
        instance=cloud
    )

    token = tokenResponse["contractToken"]

    attach_with_token(cfg, token=token, allow_enable=allow_enable)


def enable_entitlement_by_name(
    cfg: config.UAConfig,
    name: str,
    *,
    assume_yes: bool = False,
    allow_beta: bool = False,
    access_only: bool = False,
    variant: str = "",
    extra_args: Optional[List[str]] = None
):
    """
    Constructs an entitlement based on the name provided. Passes kwargs onto
    the entitlement constructor.
    :raise EntitlementNotFoundError: If no entitlement with the given name is
        found, then raises this error.
    """
    ent_cls = entitlements.entitlement_factory(
        cfg=cfg, name=name, variant=variant
    )
    entitlement = ent_cls(
        cfg,
        assume_yes=assume_yes,
        allow_beta=allow_beta,
        called_name=name,
        access_only=access_only,
        extra_args=extra_args,
    )
    return entitlement.enable()


def status(
    cfg: config.UAConfig,
    *,
    simulate_with_token: Optional[str] = None,
    show_all: bool = False
):
    """
    Construct the current Pro status dictionary.
    """
    if simulate_with_token:
        status, ret = ua_status.simulate_status(
            cfg=cfg,
            token=simulate_with_token,
            show_all=show_all,
        )
    else:
        status = ua_status.status(cfg=cfg, show_all=show_all)
        ret = 0

    return status, ret


def _write_command_output_to_file(
    cmd, filename: str, return_codes: Optional[List[int]] = None
) -> None:
    """Helper which runs a command and writes output or error to filename."""
    try:
        out, _ = system.subp(cmd.split(), rcs=return_codes)
    except exceptions.ProcessExecutionError as e:
        system.write_file("{}-error".format(filename), str(e))
    else:
        system.write_file(filename, out)


def _get_state_files(cfg: config.UAConfig):
    # include cfg log files here because they could be set to non default
    return [
        cfg.cfg_path or DEFAULT_CONFIG_FILE,
        cfg.log_file,
        timer_jobs_state_file.ua_file.path,
        CLOUD_BUILD_INFO,
        *(
            entitlement.repo_list_file_tmpl.format(name=entitlement.name)
            for entitlement in entitlements.ENTITLEMENT_CLASSES
            if issubclass(entitlement, entitlements.repo.RepoEntitlement)
        ),
    ]


def collect_logs(cfg: config.UAConfig, output_dir: str):
    """
    Write all relevant Ubuntu Pro logs to the specified directory
    """
    _write_command_output_to_file(
        "cloud-id", "{}/cloud-id.txt".format(output_dir)
    )
    _write_command_output_to_file(
        "pro status --format json", "{}/ua-status.json".format(output_dir)
    )
    _write_command_output_to_file(
        "{} status".format(livepatch.LIVEPATCH_CMD),
        "{}/livepatch-status.txt".format(output_dir),
    )
    _write_command_output_to_file(
        "systemctl list-timers --all",
        "{}/systemd-timers.txt".format(output_dir),
    )
    _write_command_output_to_file(
        (
            "journalctl --boot=0 -o short-precise "
            "-u cloud-init-local.service "
            "-u cloud-init-config.service "
            "-u cloud-config.service"
        ),
        "{}/cloud-init-journal.txt".format(output_dir),
    )
    _write_command_output_to_file(
        ("journalctl -o short-precise " "{}").format(
            " ".join(
                ["-u {}".format(s) for s in UA_SERVICES if ".service" in s]
            )
        ),
        "{}/pro-journal.txt".format(output_dir),
    )
    for service in UA_SERVICES:
        _write_command_output_to_file(
            "systemctl status {}".format(service),
            "{}/{}.txt".format(output_dir, service),
            return_codes=[0, 3],
        )

    state_files = _get_state_files(cfg)
    user_log_files = (
        pro_log.get_all_user_log_files()[:USER_LOG_COLLECTED_LIMIT]
        if util.we_are_currently_root()
        else [pro_log.get_user_log_file()]
    )
    # save log file in compressed file
    for log_file_idx, log_file in enumerate(user_log_files):
        try:
            content = util.redact_sensitive_logs(system.load_file(log_file))
            system.write_file(
                os.path.join(output_dir, "user{}.log".format(log_file_idx)),
                content,
            )
        except Exception as e:
            LOG.warning(
                "Failed to collect user log file: %s\n%s", log_file, str(e)
            )

    # also get default logrotated log files
    for f in state_files + glob.glob(DEFAULT_LOG_PREFIX + "*"):
        if os.path.isfile(f):
            try:
                content = system.load_file(f)
            except Exception as e:
                # If we fail to load that file for any reason we will
                # not break the command, we will instead warn the user
                # about the issue and try to process the other files
                LOG.warning("Failed to load file: %s\n%s", f, str(e))
                continue
            content = util.redact_sensitive_logs(content)
            if util.we_are_currently_root():
                # if root, overwrite the original with redacted content
                system.write_file(f, content)

            system.write_file(
                os.path.join(output_dir, os.path.basename(f)), content
            )

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
api Folder 0755
cli Folder 0755
clouds Folder 0755
daemon Folder 0755
entitlements Folder 0755
files Folder 0755
http Folder 0755
messages Folder 0755
timer Folder 0755
__init__.py File 0 B 0644
actions.py File 8.79 KB 0644
apt.py File 32.73 KB 0644
apt_news.py File 6.54 KB 0644
config.py File 23.1 KB 0644
contract.py File 30.3 KB 0644
contract_data_types.py File 9.89 KB 0644
data_types.py File 10.27 KB 0644
defaults.py File 2.1 KB 0644
event_logger.py File 8.06 KB 0644
exceptions.py File 14.62 KB 0644
gpg.py File 836 B 0644
livepatch.py File 12.51 KB 0644
lock.py File 3.56 KB 0644
log.py File 2.91 KB 0644
security.py File 56.78 KB 0644
security_status.py File 25.25 KB 0644
snap.py File 6.84 KB 0644
status.py File 29.69 KB 0644
system.py File 24.65 KB 0644
types.py File 308 B 0644
upgrade_lts_contract.py File 3.5 KB 0644
util.py File 15.31 KB 0644
version.py File 2.63 KB 0644
yaml.py File 840 B 0644