404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.145.8.233: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.helpers import bool_compare
from cfnlint.rules import CloudFormationLintRule, RuleMatch


class CacheClusterFailover(CloudFormationLintRule):
    """Check automatic failover on a cache cluster"""

    id = "E3026"
    shortdesc = "Check Elastic Cache Redis Cluster settings"
    description = (
        "Evaluate Redis Cluster groups to make sure automatic failover is "
        "enabled when cluster mode is enabled"
    )
    source_url = "https://github.com/awslabs/cfn-python-lint"
    tags = ["resources", "elasticcache"]

    def __init__(self):
        """Init"""
        super().__init__()
        self.resource_property_types.append("AWS::ElastiCache::ReplicationGroup")

    def is_cluster_enabled(self, properties):
        """Test if cluster is enabled"""
        if isinstance(properties, dict):
            for property_name, property_value in properties.items():
                if property_name == "cluster-enabled" and property_value == "yes":
                    return True

        return False

    def _test_cluster_settings(
        self, properties, path, pg_properties, pg_path, cfn, scenario
    ):
        """test for each scenario"""
        results = []
        pg_conditions = cfn.get_conditions_from_path(cfn.template, pg_path)
        # test to make sure that any condition that may apply to the path for the Ref
        # is not applicable
        if pg_conditions and scenario:
            for c_name, c_value in scenario.items():
                if c_name in pg_conditions:
                    if c_value not in pg_conditions.get(c_name):
                        return results
        if self.is_cluster_enabled(
            cfn.get_value_from_scenario(pg_properties, scenario)
        ):
            c_props = cfn.get_value_from_scenario(properties, scenario)
            automatic_failover = c_props.get("AutomaticFailoverEnabled")
            if bool_compare(automatic_failover, False):
                pathmessage = path[:] + ["AutomaticFailoverEnabled"]
                if scenario is None:
                    message = '"AutomaticFailoverEnabled" must be misssing or True when setting up a cluster at {0}'
                    results.append(
                        RuleMatch(
                            pathmessage, message.format("/".join(map(str, pathmessage)))
                        )
                    )
                else:
                    message = '"AutomaticFailoverEnabled" must be misssing or True when setting up a cluster when {0} at {1}'
                    scenario_text = " and ".join(
                        [f'when condition "{k}" is {v}' for (k, v) in scenario.items()]
                    )
                    results.append(
                        RuleMatch(
                            pathmessage,
                            message.format(
                                scenario_text, "/".join(map(str, pathmessage))
                            ),
                        )
                    )
            num_node_groups = c_props.get("NumNodeGroups")
            if not num_node_groups:
                # only test cache nodes if num node groups aren't specified
                num_cache_nodes = c_props.get("NumCacheClusters", 0)
                if num_cache_nodes <= 1:
                    pathmessage = path[:] + ["NumCacheClusters"]
                    if scenario is None:
                        message = '"NumCacheClusters" must be greater than one when creating a cluster at {0}'
                        results.append(
                            RuleMatch(
                                pathmessage,
                                message.format("/".join(map(str, pathmessage))),
                            )
                        )
                    else:
                        message = '"NumCacheClusters" must be greater than one when creating a cluster when {0} at {1}'
                        scenario_text = " and ".join(
                            [
                                f'when condition "{k}" is {v}'
                                for (k, v) in scenario.items()
                            ]
                        )
                        results.append(
                            RuleMatch(
                                pathmessage,
                                message.format(
                                    scenario_text, "/".join(map(str, pathmessage))
                                ),
                            )
                        )

        return results

    def test_cluster_settings(self, properties, path, pg_resource_name, pg_path, cfn):
        """Test cluster settings for the parameter group and Replication Group"""
        results = []
        pg_properties = (
            cfn.template.get("Resources", {})
            .get(pg_resource_name, {})
            .get("Properties", {})
            .get("Properties", {})
        )
        scenarios = cfn.get_conditions_scenarios_from_object(
            [properties, pg_properties]
        )
        if scenarios:
            for scenario in scenarios:
                results.extend(
                    self._test_cluster_settings(
                        properties, path, pg_properties, pg_path, cfn, scenario
                    )
                )
        else:
            results.extend(
                self._test_cluster_settings(
                    properties, path, pg_properties, pg_path, cfn, None
                )
            )
        return results

    def match_resource_properties(self, properties, _, path, cfn):
        """Check CloudFormation Properties"""
        matches = []

        parameter_groups = properties.get_safe("CacheParameterGroupName", "", path)
        for parameter_group in parameter_groups:
            pg_value = parameter_group[0]
            pg_path = parameter_group[1]
            if isinstance(pg_value, dict):
                for pg_key, pg_resource in pg_value.items():
                    if pg_key == "Ref" and pg_resource in cfn.get_resources():
                        matches.extend(
                            self.test_cluster_settings(
                                properties, path, pg_resource, pg_path, cfn
                            )
                        )

        return matches

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
CacheClusterFailover.py File 6.25 KB 0644
__init__.py File 106 B 0644