"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.helpers import bool_compare
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class CacheClusterFailover(CloudFormationLintRule):
"""Check automatic failover on a cache cluster"""
id = "E3026"
shortdesc = "Check Elastic Cache Redis Cluster settings"
description = (
"Evaluate Redis Cluster groups to make sure automatic failover is "
"enabled when cluster mode is enabled"
)
source_url = "https://github.com/awslabs/cfn-python-lint"
tags = ["resources", "elasticcache"]
def __init__(self):
"""Init"""
super().__init__()
self.resource_property_types.append("AWS::ElastiCache::ReplicationGroup")
def is_cluster_enabled(self, properties):
"""Test if cluster is enabled"""
if isinstance(properties, dict):
for property_name, property_value in properties.items():
if property_name == "cluster-enabled" and property_value == "yes":
return True
return False
def _test_cluster_settings(
self, properties, path, pg_properties, pg_path, cfn, scenario
):
"""test for each scenario"""
results = []
pg_conditions = cfn.get_conditions_from_path(cfn.template, pg_path)
# test to make sure that any condition that may apply to the path for the Ref
# is not applicable
if pg_conditions and scenario:
for c_name, c_value in scenario.items():
if c_name in pg_conditions:
if c_value not in pg_conditions.get(c_name):
return results
if self.is_cluster_enabled(
cfn.get_value_from_scenario(pg_properties, scenario)
):
c_props = cfn.get_value_from_scenario(properties, scenario)
automatic_failover = c_props.get("AutomaticFailoverEnabled")
if bool_compare(automatic_failover, False):
pathmessage = path[:] + ["AutomaticFailoverEnabled"]
if scenario is None:
message = '"AutomaticFailoverEnabled" must be misssing or True when setting up a cluster at {0}'
results.append(
RuleMatch(
pathmessage, message.format("/".join(map(str, pathmessage)))
)
)
else:
message = '"AutomaticFailoverEnabled" must be misssing or True when setting up a cluster when {0} at {1}'
scenario_text = " and ".join(
[f'when condition "{k}" is {v}' for (k, v) in scenario.items()]
)
results.append(
RuleMatch(
pathmessage,
message.format(
scenario_text, "/".join(map(str, pathmessage))
),
)
)
num_node_groups = c_props.get("NumNodeGroups")
if not num_node_groups:
# only test cache nodes if num node groups aren't specified
num_cache_nodes = c_props.get("NumCacheClusters", 0)
if num_cache_nodes <= 1:
pathmessage = path[:] + ["NumCacheClusters"]
if scenario is None:
message = '"NumCacheClusters" must be greater than one when creating a cluster at {0}'
results.append(
RuleMatch(
pathmessage,
message.format("/".join(map(str, pathmessage))),
)
)
else:
message = '"NumCacheClusters" must be greater than one when creating a cluster when {0} at {1}'
scenario_text = " and ".join(
[
f'when condition "{k}" is {v}'
for (k, v) in scenario.items()
]
)
results.append(
RuleMatch(
pathmessage,
message.format(
scenario_text, "/".join(map(str, pathmessage))
),
)
)
return results
def test_cluster_settings(self, properties, path, pg_resource_name, pg_path, cfn):
"""Test cluster settings for the parameter group and Replication Group"""
results = []
pg_properties = (
cfn.template.get("Resources", {})
.get(pg_resource_name, {})
.get("Properties", {})
.get("Properties", {})
)
scenarios = cfn.get_conditions_scenarios_from_object(
[properties, pg_properties]
)
if scenarios:
for scenario in scenarios:
results.extend(
self._test_cluster_settings(
properties, path, pg_properties, pg_path, cfn, scenario
)
)
else:
results.extend(
self._test_cluster_settings(
properties, path, pg_properties, pg_path, cfn, None
)
)
return results
def match_resource_properties(self, properties, _, path, cfn):
"""Check CloudFormation Properties"""
matches = []
parameter_groups = properties.get_safe("CacheParameterGroupName", "", path)
for parameter_group in parameter_groups:
pg_value = parameter_group[0]
pg_path = parameter_group[1]
if isinstance(pg_value, dict):
for pg_key, pg_resource in pg_value.items():
if pg_key == "Ref" and pg_resource in cfn.get_resources():
matches.extend(
self.test_cluster_settings(
properties, path, pg_resource, pg_path, cfn
)
)
return matches