"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import regex as re
import cfnlint.helpers
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class DynamicReferenceSecureString(CloudFormationLintRule):
"""Check if Dynamic Reference Secure Strings are only used in the correct locations"""
id = "E1027"
shortdesc = "Check dynamic references secure strings are in supported locations"
description = (
"Dynamic References Secure Strings are only supported for a small set of resource properties. "
"Validate that they are being used in the correct location when checking values "
"and Fn::Sub in resource properties. Currently doesn't check outputs, maps, conditions, "
"parameters, and descriptions."
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html"
tags = ["functions", "dynamic reference"]
def __init__(self):
"""Init"""
super().__init__()
self.property_specs = []
self.resource_specs = []
self.exceptions = {
"AWS::DirectoryService::MicrosoftAD": "Password",
"AWS::DirectoryService::SimpleAD": "Password",
"AWS::ElastiCache::ReplicationGroup": "AuthToken",
"AWS::IAM::User.LoginProfile": "Password",
"AWS::KinesisFirehose::DeliveryStream.RedshiftDestinationConfiguration": "Password",
"AWS::OpsWorks::App.Source": "Password",
"AWS::OpsWorks::Stack.RdsDbInstance": "DbPassword",
"AWS::OpsWorks::Stack.Source": "Password",
"AWS::RDS::DBCluster": "MasterUserPassword",
"AWS::RDS::DBInstance": "MasterUserPassword",
"AWS::Redshift::Cluster": "MasterUserPassword",
}
def initialize(self, cfn):
"""Init"""
specs = cfnlint.helpers.RESOURCE_SPECS.get(cfn.regions[0])
self.property_specs = specs.get("PropertyTypes")
self.resource_specs = specs.get("ResourceTypes")
for resource_spec in self.resource_specs:
self.resource_property_types.append(resource_spec)
for property_spec in self.property_specs:
self.resource_sub_property_types.append(property_spec)
def check_dyn_ref_value(self, value, path):
"""Chec item type"""
matches = []
if isinstance(value, str):
if re.match(cfnlint.helpers.REGEX_DYN_REF_SSM_SECURE, value):
message = f'Dynamic Reference secure strings are not supported for this property at {"/".join(map(str, path[:]))}'
matches.append(RuleMatch(path[:], message))
return matches
def check_value(self, value, path, **kwargs):
"""Check Value"""
matches = []
item_type = kwargs.get("item_type", {})
if item_type in ["Map"]:
if isinstance(value, dict):
for map_key, map_value in value.items():
if not isinstance(map_value, dict):
matches.extend(
self.check_dyn_ref_value(map_value, path[:] + [map_key])
)
else:
matches.extend(self.check_dyn_ref_value(value, path[:]))
return matches
# pylint: disable=W0613
# Need to disable for the function to work
def check_sub(self, value, path, **kwargs):
"""Check Sub Function Dynamic References"""
matches = []
if isinstance(value, list):
if isinstance(value[0], str):
matches.extend(self.check_dyn_ref_value(value[0], path[:] + [0]))
else:
matches.extend(self.check_dyn_ref_value(value, path[:]))
return matches
def check(self, cfn, properties, specs, property_type, path):
"""Check itself"""
matches = []
for prop in properties:
if prop in specs:
if property_type in self.exceptions:
if prop == self.exceptions.get(property_type):
continue
primitive_type = specs.get(prop).get("PrimitiveType")
if not primitive_type:
primitive_type = specs.get(prop).get("PrimitiveItemType")
if specs.get(prop).get("Type") in ["List", "Map"]:
item_type = specs.get(prop).get("Type")
else:
item_type = None
if primitive_type:
matches.extend(
cfn.check_value(
properties,
prop,
path[:],
check_value=self.check_value,
check_sub=self.check_sub,
primitive_type=primitive_type,
item_type=item_type,
)
)
return matches
def match_resource_sub_properties(self, properties, property_type, path, cfn):
"""Match for sub properties"""
matches = []
if self.property_specs.get(property_type, {}).get("Properties"):
property_specs = self.property_specs.get(property_type, {}).get(
"Properties", {}
)
matches.extend(
self.check(cfn, properties, property_specs, property_type, path)
)
return matches
def match_resource_properties(self, properties, resource_type, path, cfn):
"""Check CloudFormation Properties"""
matches = []
resource_specs = self.resource_specs.get(resource_type, {}).get(
"Properties", {}
)
matches.extend(self.check(cfn, properties, resource_specs, resource_type, path))
return matches