"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import cfnlint.helpers
from cfnlint.data import AdditionalSpecs
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class InstanceSize(CloudFormationLintRule):
"""Check if Resources RDS Instance Size is compatible with the RDS type"""
id = "E3025"
shortdesc = "RDS instance type is compatible with the RDS type"
description = (
"Check the RDS instance types are supported by the type of RDS engine. "
"Only if the values are strings will this be checked."
)
source_url = "https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.DBInstanceClass.html"
tags = ["resources", "rds"]
valid_instance_types = cfnlint.helpers.load_resource(
AdditionalSpecs, "RdsProperties.json"
)
def _get_license_model(self, engine, license_model):
"""Logic to get the correct license model"""
if not license_model:
if engine in self.valid_instance_types.get("license-included"):
license_model = "license-included"
elif engine in self.valid_instance_types.get("bring-your-own-license"):
license_model = "bring-your-own-license"
else:
license_model = "general-public-license"
self.logger.debug(
"Based on Engine: %s we determined the default license will be %s",
engine,
license_model,
)
return license_model
def get_resources(self, cfn):
"""Get resources that can be checked"""
results = []
for resource_name, resource_values in cfn.get_resources(
"AWS::RDS::DBInstance"
).items():
path = ["Resources", resource_name, "Properties"]
properties = resource_values.get("Properties")
# Properties items_safe heps remove conditions and focusing on the actual values and scenarios
for prop_safe, prop_path_safe in properties.items_safe(path):
engine = prop_safe.get("Engine")
inst_class = prop_safe.get("DBInstanceClass")
license_model = prop_safe.get("LicenseModel")
# Need to get a default license model if none provided
# Also need to validate all these values are strings otherwise we cannot
# do validation
if isinstance(engine, str) and isinstance(inst_class, str):
license_model = self._get_license_model(engine, license_model)
if isinstance(license_model, str):
results.append(
{
"Engine": engine,
"DBInstanceClass": inst_class,
"Path": prop_path_safe,
"LicenseModel": license_model,
}
)
else:
self.logger.debug(
"Skip evaluation based on [LicenseModel] not being a string."
)
else:
self.logger.debug(
"Skip evaluation based on [Engine] or [DBInstanceClass] not being strings."
)
return results
def check_db_config(self, properties, region):
"""Check db properties"""
matches = []
db_engine = properties.get("Engine")
db_license = properties.get("LicenseModel")
db_instance_class = properties.get("DBInstanceClass")
if db_license in self.valid_instance_types:
if db_engine in self.valid_instance_types[db_license]:
if region in self.valid_instance_types[db_license][db_engine]:
if (
db_instance_class
not in self.valid_instance_types[db_license][db_engine][region]
):
message = 'DBInstanceClass "{0}" is not compatible with engine type "{1}" and LicenseModel "{2}" in region "{3}". Use instance types [{4}]'
matches.append(
RuleMatch(
properties.get("Path") + ["DBInstanceClass"],
message.format(
db_instance_class,
db_engine,
db_license,
region,
", ".join(
map(
str,
self.valid_instance_types[db_license][
db_engine
][region],
)
),
),
)
)
else:
self.logger.debug(
"Skip evaluation based on license [%s] not matching.", db_license
)
return matches
def match(self, cfn):
"""Check RDS Resource Instance Sizes"""
matches = []
resources = self.get_resources(cfn)
for resource in resources:
for region in cfn.regions:
matches.extend(self.check_db_config(resource, region))
return matches