404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.220.147.154: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import argparse
import copy
import glob
import json
import logging
import os
import sys
from pathlib import Path
from typing import Dict

import jsonschema

import cfnlint.decode.cfn_yaml
from cfnlint.helpers import REGIONS
from cfnlint.version import __version__

# pylint: disable=too-many-public-methods
LOGGER = logging.getLogger("cfnlint")


def configure_logging(debug_logging, info_logging):
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    if debug_logging:
        LOGGER.setLevel(logging.DEBUG)
    elif info_logging:
        LOGGER.setLevel(logging.INFO)
    else:
        LOGGER.setLevel(logging.NOTSET)
    log_formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    ch.setFormatter(log_formatter)

    # make sure all other log handlers are removed before adding it back
    for handler in LOGGER.handlers:
        LOGGER.removeHandler(handler)
    LOGGER.addHandler(ch)


class ConfigFileArgs:
    """
    Config File arguments.
    Parses .cfnlintrc OR .cfnlintrc.yaml OR .cfnlintrc.yml in the Home and Project folder.
    """

    file_args: Dict = {}
    __user_config_file = None
    __project_config_file = None
    __custom_config_file = None

    def __init__(self, schema=None, config_file=None):
        # self.file_args = self.get_config_file_defaults()
        self.file_args = {}
        self.default_schema_file = Path(__file__).parent.joinpath(
            "data/CfnLintCli/config/schema.json"
        )
        with self.default_schema_file.open(encoding="utf-8") as f:
            self.default_schema = json.load(f)
        self.schema = self.default_schema if not schema else schema

        if config_file:
            self.__custom_config_file = config_file
        else:
            LOGGER.debug("Looking for CFLINTRC before attempting to load")
            self.__user_config_file, self.__project_config_file = self._find_config()

        self.load()

    def _find_config(self):
        """Looks up for user and project level config
        Returns
        -------
        Tuple
            (Path, Path)
            Tuple with both configs and whether they were found
        Example
        -------
            > user_config, project_config = self._find_config()
        """
        config_file_name = ".cfnlintrc"
        self.__user_config_file = Path.home().joinpath(config_file_name)

        self.__project_config_file = Path.cwd().joinpath(config_file_name)
        if self._has_file(config_file_name + ".yaml"):
            self.__project_config_file = Path.cwd().joinpath(config_file_name + ".yaml")
        elif self._has_file(config_file_name + ".yml"):
            self.__project_config_file = Path.cwd().joinpath(config_file_name + ".yml")

        user_config_path = ""
        project_config_path = ""

        if self._has_file(self.__user_config_file):
            LOGGER.debug("Found User CFNLINTRC")
            user_config_path = self.__user_config_file

        if self._has_file(self.__project_config_file):
            LOGGER.debug("Found Project level CFNLINTRC")
            project_config_path = self.__project_config_file

        return user_config_path, project_config_path

    def _has_file(self, filename):
        """Confirm whether file exists
        Parameters
        ----------
        filename : str
            Path to a file
        Returns
        -------
        Boolean
        """

        return Path(filename).is_file()

    def load(self):
        """Load configuration file and expose as a dictionary
        Returns
        -------
        Dict
            CFLINTRC configuration
        """

        if self.__custom_config_file:
            custom_config = self._read_config(self.__custom_config_file)
            LOGGER.debug("Validating Custom CFNLINTRC")
            self.validate_config(custom_config, self.schema)
            LOGGER.debug("Custom configuration loaded as")
            LOGGER.debug("%s", custom_config)

            self.file_args = custom_config
        else:
            user_config = self._read_config(self.__user_config_file)
            LOGGER.debug("Validating User CFNLINTRC")
            self.validate_config(user_config, self.schema)

            project_config = self._read_config(self.__project_config_file)
            LOGGER.debug("Validating Project CFNLINTRC")
            self.validate_config(project_config, self.schema)

            LOGGER.debug("User configuration loaded as")
            LOGGER.debug("%s", user_config)
            LOGGER.debug("Project configuration loaded as")
            LOGGER.debug("%s", project_config)

            LOGGER.debug("Merging configurations...")
            self.file_args = self.merge_config(user_config, project_config)

    def validate_config(self, config, schema):
        """Validate configuration against schema
        Parameters
        ----------
        config : dict
            CFNLINTRC configuration
        schema : dict
            JSONSchema to validate against
        Raises
        -------
        jsonschema.exceptions.ValidationError
            Returned when cfnlintrc doesn't match schema provided
        """
        LOGGER.debug("Validating CFNLINTRC config with given JSONSchema")
        LOGGER.debug("Schema used: %s", schema)
        LOGGER.debug("Config used: %s", config)

        jsonschema.validate(config, schema)
        LOGGER.debug("CFNLINTRC looks valid!")

    def merge_config(self, user_config, project_config):
        """Merge project and user configuration into a single dictionary
        Creates a new configuration with both configuration merged
        it favours project level over user configuration if keys are duplicated
        NOTE
        ----
            It takes any number of nested dicts
            It overrides lists found in user_config with project_config
        Parameters
        ----------
        user_config : Dict
            User configuration (~/.cfnlintrc) found at user's home directory
        project_config : Dict
            Project configuration (.cfnlintrc) found at current directory
        Returns
        -------
        Dict
            Merged configuration
        """
        # Recursively override User config with Project config
        for key in user_config:
            if key in project_config:
                # If both keys are the same, let's check whether they have nested keys
                if isinstance(user_config[key], dict) and isinstance(
                    project_config[key], dict
                ):
                    self.merge_config(user_config[key], project_config[key])
                else:
                    user_config[key] = project_config[key]
                    LOGGER.debug(
                        "Overriding User's key %s with Project's specific value %s.",
                        key,
                        project_config[key],
                    )

        # Project may have unique config we need to copy over too
        # so that we can have user+project config available as one
        for key in project_config:
            if key not in user_config:
                user_config[key] = project_config[key]

        return user_config

    def _read_config(self, config):
        """Parse given YAML configuration
        Returns
        -------
        Dict
            Parsed YAML configuration as dictionary
        """
        config = Path(config)
        config_template = None

        if self._has_file(config):
            LOGGER.debug("Parsing CFNLINTRC")
            config_template = cfnlint.decode.cfn_yaml.load(str(config))

        if not config_template:
            config_template = {}

        return config_template


def comma_separated_arg(string):
    """Split a comma separated string"""
    return string.split(",")


def _ensure_value(namespace, name, value):
    if getattr(namespace, name, None) is None:
        setattr(namespace, name, value)
    return getattr(namespace, name)


class RuleConfigurationAction(argparse.Action):
    """Override the default Action"""

    def __init__(
        self,
        option_strings,
        dest,
        nargs=None,
        const=None,
        default=None,
        type=None,
        choices=None,
        required=False,
        help=None,
        metavar=None,
    ):  # pylint: disable=W0622
        super().__init__(
            option_strings=option_strings,
            dest=dest,
            nargs=nargs,
            const=const,
            default=default,
            type=type,
            choices=choices,
            required=required,
            help=help,
            metavar=metavar,
        )

    def _parse_rule_configuration(self, string):
        """Parse the config rule structure"""
        configs = comma_separated_arg(string)
        results = {}
        for config in configs:
            rule_id = config.split(":")[0]
            config_name = config.split(":")[1].split("=")[0]
            config_value = config.split(":")[1].split("=")[1]
            if rule_id not in results:
                results[rule_id] = {}
            results[rule_id][config_name] = config_value

        return results

    def __call__(self, parser, namespace, values, option_string=None):
        items = copy.copy(_ensure_value(namespace, self.dest, {}))
        try:
            for value in values:
                new_value = self._parse_rule_configuration(value)
                for v_k, v_vs in new_value.items():
                    if v_k in items:
                        for s_k, s_v in v_vs.items():
                            items[v_k][s_k] = s_v
                    else:
                        items[v_k] = v_vs
            setattr(namespace, self.dest, items)
        except Exception:  # pylint: disable=W0703
            parser.print_help()
            parser.exit()


class CliArgs:
    """Base Args class"""

    cli_args: Dict = {}

    def __init__(self, cli_args):
        self.parser = self.create_parser()
        self.cli_args, _ = self.parser.parse_known_args(cli_args)

    def create_parser(self):
        """Do first round of parsing parameters to set options"""

        class ArgumentParser(argparse.ArgumentParser):
            """Override Argument Parser so we can control the exit code"""

            def error(self, message):
                self.print_help(sys.stderr)
                self.exit(32, f"{self.prog}: error: {message}\n")

        class ExtendAction(argparse.Action):
            """Support argument types that are lists and can be specified multiple times."""

            def __call__(self, parser, namespace, values, option_string=None):
                items = getattr(namespace, self.dest)
                items = [] if items is None else items
                for value in values:
                    if isinstance(value, list):
                        items.extend(value)
                    else:
                        items.append(value)
                setattr(namespace, self.dest, items)

        usage = (
            "\nBasic: cfn-lint test.yaml\n"
            "Ignore a rule: cfn-lint -i E3012 -- test.yaml\n"
            "Configure a rule: cfn-lint -x E3012:strict=false -t test.yaml\n"
            "Lint all yaml files in a folder: cfn-lint dir/**/*.yaml"
        )

        parser = ArgumentParser(description="CloudFormation Linter", usage=usage)
        parser.register("action", "extend", ExtendAction)

        standard = parser.add_argument_group("Standard")
        advanced = parser.add_argument_group("Advanced / Debugging")

        # Allow the template to be passes as an optional or a positional argument
        standard.add_argument(
            "templates",
            metavar="TEMPLATE",
            nargs="*",
            help="The CloudFormation template to be linted",
        )
        standard.add_argument(
            "-t",
            "--template",
            metavar="TEMPLATE",
            dest="template_alt",
            help="The CloudFormation template to be linted",
            nargs="+",
            default=[],
            action="extend",
        )
        standard.add_argument(
            "-b",
            "--ignore-bad-template",
            help="Ignore failures with Bad template",
            action="store_true",
        )
        standard.add_argument(
            "--ignore-templates",
            dest="ignore_templates",
            help="Ignore templates",
            nargs="+",
            default=[],
            action="extend",
        )
        advanced.add_argument(
            "-D", "--debug", help="Enable debug logging", action="store_true"
        )
        advanced.add_argument(
            "-I", "--info", help="Enable information logging", action="store_true"
        )
        standard.add_argument(
            "-f",
            "--format",
            help="Output Format",
            choices=["quiet", "parseable", "json", "junit", "pretty", "sarif"],
        )
        standard.add_argument(
            "-l",
            "--list-rules",
            dest="listrules",
            default=False,
            action="store_true",
            help="list all the rules",
        )
        standard.add_argument(
            "-r",
            "--regions",
            dest="regions",
            nargs="+",
            default=[],
            type=comma_separated_arg,
            action="extend",
            help="list the regions to validate against.",
        )
        advanced.add_argument(
            "-a",
            "--append-rules",
            dest="append_rules",
            nargs="+",
            default=[],
            type=comma_separated_arg,
            action="extend",
            help="specify one or more rules directories using "
            "one or more --append-rules arguments. ",
        )
        standard.add_argument(
            "-i",
            "--ignore-checks",
            dest="ignore_checks",
            nargs="+",
            default=[],
            type=comma_separated_arg,
            action="extend",
            help="only check rules whose id do not match these values",
        )
        standard.add_argument(
            "-c",
            "--include-checks",
            dest="include_checks",
            nargs="+",
            default=[],
            type=comma_separated_arg,
            action="extend",
            help="include rules whose id match these values",
        )
        standard.add_argument(
            "-m",
            "--mandatory-checks",
            dest="mandatory_checks",
            nargs="+",
            default=[],
            type=comma_separated_arg,
            action="extend",
            help="always check rules whose id match these values, regardless of template exclusions",
        )
        standard.add_argument(
            "-e",
            "--include-experimental",
            help="Include experimental rules",
            action="store_true",
        )
        standard.add_argument(
            "-x",
            "--configure-rule",
            dest="configure_rules",
            nargs="+",
            default={},
            action=RuleConfigurationAction,
            help="Provide configuration for a rule. Format RuleId:key=value. Example: E3012:strict=false",
        )
        standard.add_argument(
            "--config-file",
            dest="config_file",
            help="Specify the cfnlintrc file to use",
        )
        standard.add_argument(
            "-z",
            "--custom-rules",
            dest="custom_rules",
            help="Allows specification of a custom rule file.",
        )
        advanced.add_argument(
            "-o",
            "--override-spec",
            dest="override_spec",
            help="A CloudFormation Spec override file that allows customization",
        )
        advanced.add_argument(
            "-g",
            "--build-graph",
            help="Creates a file in the same directory as the template that models the template's resources in DOT format",
            action="store_true",
        )
        advanced.add_argument(
            "-s",
            "--registry-schemas",
            help="one or more directories of CloudFormation Registry Schemas",
            action="extend",
            type=comma_separated_arg,
            nargs="+",
        )
        standard.add_argument(
            "-v",
            "--version",
            help="Version of cfn-lint",
            action="version",
            version=f"%(prog)s {__version__}",
        )
        advanced.add_argument(
            "-u",
            "--update-specs",
            help="Update the CloudFormation Specs",
            action="store_true",
        )
        advanced.add_argument(
            "--update-documentation", help=argparse.SUPPRESS, action="store_true"
        )
        advanced.add_argument(
            "--update-iam-policies", help=argparse.SUPPRESS, action="store_true"
        )
        standard.add_argument(
            "--output-file",
            type=str,
            default=None,
            help="Writes the output to the specified file, ideal for producing reports",
        )
        standard.add_argument(
            "--merge-configs",
            default=False,
            action="store_true",
            help="Merges lists between configuration layers",
        )
        standard.add_argument(
            "--non-zero-exit-code",
            type=str,
            default="informational",
            choices=["informational", "warning", "error", "none"],
            help="Exit code will be non zero from the specified rule class and higher",
        )
        advanced.add_argument("--force", help=argparse.SUPPRESS, action="store_true")

        return parser


class TemplateArgs:
    """Per Template Args"""

    def __init__(self, template_args):
        self.set_template_args(template_args)

    def get_template_args(self):
        return self._template_args

    def set_template_args(self, template):
        defaults = {}
        if isinstance(template, dict):
            metadata = template.get("Metadata", {})
            if metadata:
                configs = (
                    template.get("Metadata", {}).get("cfn-lint", {}).get("config", {})
                )

                if isinstance(configs, dict):
                    for config_name, config_value in configs.items():
                        if config_name == "ignore_checks":
                            if isinstance(config_value, list):
                                defaults["ignore_checks"] = config_value
                        if config_name == "regions":
                            if isinstance(config_value, list):
                                defaults["regions"] = config_value
                        if config_name == "append_rules":
                            if isinstance(config_value, list):
                                defaults["append_rules"] = config_value
                        if config_name == "override_spec":
                            if isinstance(config_value, (str)):
                                defaults["override_spec"] = config_value
                        if config_name == "custom_rules":
                            if isinstance(config_value, (str)):
                                defaults["custom_rules"] = config_value
                        if config_name == "ignore_bad_template":
                            if isinstance(config_value, bool):
                                defaults["ignore_bad_template"] = config_value
                        if config_name == "include_checks":
                            if isinstance(config_value, list):
                                defaults["include_checks"] = config_value
                        if config_name == "configure_rules":
                            if isinstance(config_value, dict):
                                defaults["configure_rules"] = config_value

        self._template_args = defaults

    template_args = property(get_template_args, set_template_args)


# pylint: disable=too-many-public-methods
class ConfigMixIn(TemplateArgs, CliArgs, ConfigFileArgs):
    """Mixin for the Configs"""

    def __init__(self, cli_args):
        CliArgs.__init__(self, cli_args)
        # configure debug as soon as we can
        configure_logging(self.cli_args.debug, self.cli_args.info)
        TemplateArgs.__init__(self, {})
        ConfigFileArgs.__init__(
            self, config_file=self._get_argument_value("config_file", False, False)
        )

    def _get_argument_value(self, arg_name, is_template, is_config_file):
        cli_value = getattr(self.cli_args, arg_name)
        template_value = self.template_args.get(arg_name)
        file_value = self.file_args.get(arg_name)

        # merge list configurations
        # make sure we don't do an infinite loop so skip this check for merge_configs
        if arg_name != "merge_configs":
            if self.merge_configs:
                # the CLI will always have an empty list when the item is a list
                # we will use that to evaluate if we need to merge the lists
                if isinstance(cli_value, list):
                    # Use a copy here, otherwise we will accumulate template level config
                    # into the cli_value which will  persist between template files
                    result = cli_value.copy()
                    if isinstance(template_value, list):
                        result.extend(template_value)
                    if isinstance(file_value, list):
                        result.extend(file_value)
                    return result

        # return individual items
        if cli_value:
            return cli_value
        if template_value and is_template:
            return template_value
        if file_value and is_config_file:
            return file_value
        return cli_value

    @property
    def ignore_checks(self):
        return self._get_argument_value("ignore_checks", True, True)

    @property
    def include_checks(self):
        results = self._get_argument_value("include_checks", True, True)
        return ["W", "E"] + results

    @property
    def mandatory_checks(self):
        return self._get_argument_value("mandatory_checks", False, True)

    @property
    def include_experimental(self):
        return self._get_argument_value("include_experimental", True, True)

    @property
    def regions(self):
        results = self._get_argument_value("regions", True, True)
        if not results:
            default_region_env = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
            return [os.environ.get("AWS_REGION", default_region_env)]
        if "ALL_REGIONS" in results:
            return REGIONS
        return results

    @property
    def ignore_bad_template(self):
        return self._get_argument_value("ignore_bad_template", True, True)

    @property
    def debug(self):
        return self._get_argument_value("debug", False, False)

    @property
    def format(self):
        return self._get_argument_value("format", False, True)

    @property
    def templates(self):
        templates_args = self._get_argument_value("templates", False, True)
        template_alt_args = self._get_argument_value("template_alt", False, False)
        if template_alt_args:
            filenames = template_alt_args
        elif templates_args:
            filenames = templates_args
        else:
            return None

        # if only one is specified convert it to array
        if isinstance(filenames, str):
            filenames = [filenames]

        # handle different shells and Config files
        # some shells don't expand * and configparser won't expand wildcards
        all_filenames = []
        ignore_templates = self._ignore_templates()
        for filename in filenames:
            add_filenames = glob.glob(filename, recursive=True)
            # only way to know of the glob failed is to test it
            # then add the filename as requested
            if not add_filenames:
                if filename not in ignore_templates:
                    all_filenames.append(filename)
            else:
                for add_filename in add_filenames:
                    if add_filename not in ignore_templates:
                        all_filenames.append(add_filename)

        return sorted(all_filenames)

    def _ignore_templates(self):
        ignore_template_args = self._get_argument_value("ignore_templates", False, True)
        if ignore_template_args:
            filenames = ignore_template_args
        else:
            return []

        # if only one is specified convert it to array
        if isinstance(filenames, str):
            filenames = [filenames]

        # handle different shells and Config files
        # some shells don't expand * and configparser won't expand wildcards
        all_filenames = []
        for filename in filenames:
            add_filenames = glob.glob(filename, recursive=True)
            # only way to know of the glob failed is to test it
            # then add the filename as requested
            if not add_filenames:
                all_filenames.append(filename)
            else:
                all_filenames.extend(add_filenames)

        return all_filenames

    @property
    def append_rules(self):
        return self._get_argument_value("append_rules", False, True)

    @property
    def override_spec(self):
        return self._get_argument_value("override_spec", False, True)

    @property
    def custom_rules(self):
        """custom_rules_spec"""
        return self._get_argument_value("custom_rules", False, True)

    @property
    def update_specs(self):
        return self._get_argument_value("update_specs", False, False)

    @property
    def update_documentation(self):
        return self._get_argument_value("update_documentation", False, False)

    @property
    def update_iam_policies(self):
        return self._get_argument_value("update_iam_policies", False, False)

    @property
    def listrules(self):
        return self._get_argument_value("listrules", False, False)

    @property
    def configure_rules(self):
        return self._get_argument_value("configure_rules", True, True)

    @property
    def config_file(self):
        return self._get_argument_value("config_file", False, False)

    @property
    def build_graph(self):
        return self._get_argument_value("build_graph", False, False)

    @property
    def output_file(self):
        return self._get_argument_value("output_file", False, True)

    @property
    def registry_schemas(self):
        return self._get_argument_value("registry_schemas", False, True)

    @property
    def merge_configs(self):
        return self._get_argument_value("merge_configs", True, True)

    @property
    def non_zero_exit_code(self):
        return self._get_argument_value("non_zero_exit_code", False, False)

    @property
    def force(self):
        return self._get_argument_value("force", False, False)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
conditions Folder 0755
data Folder 0755
decode Folder 0755
decorators Folder 0755
formatters Folder 0755
rules Folder 0755
template Folder 0755
__init__.py File 2.47 KB 0644
__main__.py File 1.78 KB 0644
api.py File 1.59 KB 0644
config.py File 26.56 KB 0644
core.py File 11.02 KB 0644
exceptions.py File 592 B 0644
graph.py File 10.85 KB 0644
helpers.py File 21.95 KB 0644
languageExtensions.py File 1.98 KB 0644
maintenance.py File 18.87 KB 0644
runner.py File 3.93 KB 0644
transform.py File 8.48 KB 0644
version.py File 130 B 0644