"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class Configuration(CloudFormationLintRule):
"""Check Update Policy Configuration"""
id = "E3016"
shortdesc = "Check the configuration of a resources UpdatePolicy"
description = "Make sure a resources UpdatePolicy is properly configured"
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-attribute-updatepolicy.html"
tags = ["resources", "updatepolicy"]
valid_attributes = {
"sub": {
"AutoScalingReplacingUpdate": {"WillReplace": {"PrimitiveType": "Boolean"}},
"AutoScalingRollingUpdate": {
"MaxBatchSize": {"PrimitiveType": "Integer"},
"MinInstancesInService": {"PrimitiveType": "Integer"},
"MinSuccessfulInstancesPercent": {"PrimitiveType": "Integer"},
"PauseTime": {"PrimitiveType": "String"},
"SuspendProcesses": {
"PrimitiveType": "List",
"ValidValues": [
"Launch",
"Terminate",
"HealthCheck",
"ReplaceUnhealthy",
"AZRebalance",
"AlarmNotification",
"ScheduledActions",
"AddToLoadBalancer",
"InstanceRefresh",
],
},
"WaitOnResourceSignals": {"PrimitiveType": "Boolean"},
},
"AutoScalingScheduledAction": {
"IgnoreUnmodifiedGroupSizeProperties": {"PrimitiveType": "Boolean"}
},
"CodeDeployLambdaAliasUpdate": {
"AfterAllowTrafficHook": {"PrimitiveType": "String"},
"ApplicationName": {"PrimitiveType": "String"},
"BeforeAllowTrafficHook": {"PrimitiveType": "String"},
"DeploymentGroupName": {"PrimitiveType": "String"},
},
},
"main": {
"AutoScalingReplacingUpdate": {
"Type": "AutoScalingReplacingUpdate",
"ResourceTypes": ["AWS::AutoScaling::AutoScalingGroup"],
},
"AutoScalingRollingUpdate": {
"Type": "AutoScalingRollingUpdate",
"ResourceTypes": ["AWS::AutoScaling::AutoScalingGroup"],
},
"AutoScalingScheduledAction": {
"Type": "AutoScalingScheduledAction",
"ResourceTypes": ["AWS::AutoScaling::AutoScalingGroup"],
},
"CodeDeployLambdaAliasUpdate": {
"Type": "CodeDeployLambdaAliasUpdate",
"ResourceTypes": ["AWS::Lambda::Alias"],
},
"EnableVersionUpgrade": {
"PrimitiveType": "Boolean",
"ResourceTypes": [
"AWS::Elasticsearch::Domain",
"AWS::OpenSearchService::Domain",
],
},
"UseOnlineResharding": {
"PrimitiveType": "Boolean",
"ResourceTypes": ["AWS::ElastiCache::ReplicationGroup"],
},
},
"primitive_types": {
"String": (str),
"Integer": (int),
"Boolean": (bool),
"List": list,
},
}
def check_value(self, value, path, **kwargs):
"""Check a primitive value"""
matches = []
prim_type = kwargs.get("primitive_type")
if prim_type == "List":
valid_values = kwargs.get("valid_values")
if valid_values:
if value not in valid_values:
message = "Allowed values for {0} are ({1})"
matches.append(
RuleMatch(
path,
message.format(
kwargs.get("key_name"),
", ".join(map(str, valid_values)),
),
)
)
else:
default_message = (
f'Value for {kwargs.get("key_name")} must be of type {prim_type}'
)
if not isinstance(
value, self.valid_attributes.get("primitive_types").get(prim_type)
):
try:
if prim_type in ["String"]:
str(value)
elif prim_type in ["Boolean"]:
if value not in ["True", "true", "False", "false"]:
matches.append(RuleMatch(path, default_message))
else: # has to be integer
if isinstance(value, bool):
matches.append(RuleMatch(path, default_message))
if isinstance(value, float):
if not value.is_integer():
matches.append(RuleMatch(path, default_message))
else:
int(value)
except Exception: # pylint: disable=W0703
matches.append(RuleMatch(path, default_message))
return matches
def check_attributes(self, cfn, properties, spec_type, path):
"""
Check the properties against the spec
"""
matches = []
spec = self.valid_attributes.get("sub").get(spec_type)
if isinstance(properties, dict):
for p_value_safe, p_path_safe in properties.items_safe(path):
for p_name, p_value in p_value_safe.items():
if p_name in spec:
up_type_spec = spec.get(p_name)
if "Type" in up_type_spec:
matches.extend(
self.check_attributes(
cfn,
p_value,
up_type_spec.get("Type"),
p_path_safe[:] + [p_name],
)
)
else:
matches.extend(
cfn.check_value(
obj={p_name: p_value},
key=p_name,
path=p_path_safe[:],
check_value=self.check_value,
valid_values=up_type_spec.get("ValidValues", []),
primitive_type=up_type_spec.get("PrimitiveType"),
key_name=p_name,
)
)
else:
message = "UpdatePolicy doesn't support type {0}"
matches.append(
RuleMatch(path[:] + [p_name], message.format(p_name))
)
else:
message = "{0} should be an object"
matches.append(RuleMatch(path, message.format(path[-1])))
return matches
def check_update_policy(self, cfn, update_policy, path, resource_type):
"""Check an update policy"""
matches = []
for up_type, up_value in update_policy.items():
up_path = path[:] + [up_type]
up_type_spec = self.valid_attributes.get("main").get(up_type)
if up_type_spec:
if resource_type not in up_type_spec.get("ResourceTypes"):
message = "UpdatePolicy only supports this type for resources of type ({0})"
matches.append(
RuleMatch(
up_path,
message.format(
", ".join(map(str, up_type_spec.get("ResourceTypes")))
),
)
)
if "Type" in up_type_spec:
matches.extend(
self.check_attributes(
cfn, up_value, up_type_spec.get("Type"), up_path[:]
)
)
else:
matches.extend(
cfn.check_value(
obj={up_type: up_value},
key=up_type,
path=up_path[:],
check_value=self.check_value,
valid_values=up_type_spec.get("ValidValues", []),
primitive_type=up_type_spec.get("PrimitiveType"),
key_name=up_type,
)
)
else:
message = "UpdatePolicy doesn't support type {0}"
matches.append(RuleMatch(up_path, message.format(up_type)))
return matches
def match(self, cfn):
matches = []
for r_name, r_values in cfn.get_resources().items():
update_policy = r_values.get("UpdatePolicy")
if update_policy is None:
continue
resource_type = r_values.get("Type")
if isinstance(update_policy, dict):
path = ["Resources", r_name, "UpdatePolicy"]
for up_value_safe, up_path_safe in update_policy.items_safe(path):
matches.extend(
self.check_update_policy(
cfn, up_value_safe, up_path_safe, resource_type
)
)
else:
message = "UpdatePolicy should be an object"
matches.append(
RuleMatch(["Resources", r_name, "UpdatePolicy"], message.format())
)
return matches