"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import regex as re
from cfnlint.helpers import REGEX_ALPHANUMERIC, REGEX_IPV4, REGEX_IPV6
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class RecordSet(CloudFormationLintRule):
"""Check Route53 Recordset Configuration"""
id = "E3020"
shortdesc = "Validate Route53 RecordSets"
description = "Check if all RecordSets are correctly configured"
source_url = "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/ResourceRecordTypes.html"
tags = ["resources", "route53", "record_set"]
# Regex generated from https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DomainNameFormat.html
REGEX_DOMAINNAME = re.compile(
r"^[a-zA-Z0-9\!\"\#\$\%\&\'\(\)\*\+\,-\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~\.]+$"
)
REGEX_TXT = re.compile(r'^("[^"]{1,255}" *)*"[^"]{1,255}"$')
REGEX_CNAME_VALIDATIONS = re.compile(r"^.*\.acm-validations\.aws\.?$")
def count_c_names(self, records, path, cfn):
"""Count C Names"""
matches = []
scenarios = cfn.get_object_without_nested_conditions(records, path)
for scenario in scenarios:
if len(scenario.get("Object")) > 1:
scenario = scenario.get("Scenario")
message = "A CNAME recordset can only contain 1 value"
if scenario is None:
message = "A CNAME recordset can only contain 1 value"
matches.append(
RuleMatch(path, message.format("/".join(map(str, message))))
)
else:
message = "A CNAME recordset can only contain 1 value {0} at {1}"
scenario_text = " and ".join(
[f'when condition "{k}" is {v}' for (k, v) in scenario.items()]
)
matches.append(
RuleMatch(
path,
message.format(scenario_text, "/".join(map(str, path))),
)
)
return matches
def check_record(self, value, path, record_type, regex, regex_name):
matches = []
if isinstance(value, str):
if not re.match(regex, value):
message = record_type + " record ({}) is not a valid " + regex_name
matches.append(RuleMatch(path, message.format(value)))
return matches
def check_a_record(self, value, path):
return self.check_record(value, path, "A", REGEX_IPV4, "IPv4 address")
def check_aaaa_record(self, value, path):
return self.check_record(value, path, "AAAA", REGEX_IPV6, "IPv6 address")
def check_caa_record(self, value, path):
"""Check CAA record Configuration"""
matches = []
if isinstance(value, str):
# Split the record up to the mandatory settings (flags tag "value")
items = value.split(" ", 2)
# Check if the 3 settings are given.
if len(items) != 3:
message = 'CAA record must contain 3 settings (flags tag "value"), record contains {} settings.'
matches.append(RuleMatch(path, message.format(len(items))))
else:
# Check the flag value
if not items[0].isdigit():
message = "CAA record flag setting ({}) should be of type Integer."
extra_args = {
"actual_type": type(items[0]).__name__,
"expected_type": int.__name__,
}
matches.append(
RuleMatch(path, message.format(items[0]), **extra_args)
)
else:
if int(items[0]) not in [0, 128]:
message = "Invalid CAA record flag setting ({}) given, must be 0 or 128."
matches.append(RuleMatch(path, message.format(items[0])))
# Check the tag value
if not re.match(REGEX_ALPHANUMERIC, items[1]):
message = "Invalid CAA record tag setting {}. Value has to be alphanumeric."
matches.append(RuleMatch(path, message.format(items[1])))
# Check the value
if not items[2].startswith('"') or not items[2].endswith('"'):
message = 'CAA record value setting has to be enclosed in double quotation marks (").'
matches.append(RuleMatch(path, message))
return matches
def check_cname_record(self, value, path):
"""Check CNAME record Configuration"""
matches = []
if not isinstance(value, dict):
if not re.match(self.REGEX_DOMAINNAME, value) and not re.match(
self.REGEX_CNAME_VALIDATIONS, value
):
# ACM Route 53 validation uses invalid CNAMEs starting with `_`,
# special-case them rather than complicate the regex.
message = "CNAME record ({}) does not contain a valid domain name"
matches.append(RuleMatch(path, message.format(value)))
return matches
def check_mx_record(self, value, path):
"""Check MX record Configuration"""
matches = []
if isinstance(value, str):
# Split the record up to the mandatory settings (priority domainname)
items = value.split(" ")
# Check if the 3 settings are given.
if len(items) != 2:
message = "MX record must contain 2 settings (priority domainname), record contains {} settings."
matches.append(RuleMatch(path, message.format(len(items), value)))
else:
# Check the priority value
if not items[0].isdigit():
message = (
"MX record priority setting ({}) should be of type Integer."
)
extra_args = {
"actual_type": type(items[0]).__name__,
"expected_type": int.__name__,
}
matches.append(
RuleMatch(path, message.format(items[0], value), **extra_args)
)
else:
if not 0 <= int(items[0]) <= 65535:
message = "Invalid MX record priority setting ({}) given, must be between 0 and 65535."
matches.append(RuleMatch(path, message.format(items[0], value)))
# Check the domainname value
if not re.match(self.REGEX_DOMAINNAME, items[1]):
matches.append(RuleMatch(path, message.format(items[1])))
return matches
def check_ns_record(self, value, path):
return self.check_record(
value, path, "NS", self.REGEX_DOMAINNAME, "domain name"
)
def check_ptr_record(self, value, path):
return self.check_record(
value, path, "PTR", self.REGEX_DOMAINNAME, "domain name"
)
def check_txt_record(self, value, path):
"""Check TXT record Configuration"""
matches = []
if not isinstance(value, dict) and not re.match(self.REGEX_TXT, value):
message = (
"TXT record is not structured as one or more items up to 255 characters "
"enclosed in double quotation marks at {0}"
)
matches.append(
RuleMatch(
path,
(message.format("/".join(map(str, path)))),
)
)
return matches
def check_recordset(self, path, recordset, cfn):
"""Check record configuration"""
matches = []
recordset_type = recordset.get("Type")
# Skip Intrinsic functions
if not isinstance(recordset_type, dict):
if not recordset.get("AliasTarget"):
# If no Alias is specified, ResourceRecords has to be specified
if not recordset.get("ResourceRecords"):
return matches
# Record type specific checks
if recordset_type == "A":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_a_record,
)
)
elif recordset_type == "AAAA":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_aaaa_record,
)
)
elif recordset_type == "CAA":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_caa_record,
)
)
elif recordset_type == "CNAME":
matches.extend(
self.count_c_names(
recordset.get("ResourceRecords"),
path[:] + ["ResourceRecords"],
cfn,
)
)
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_cname_record,
)
)
elif recordset_type == "MX":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_mx_record,
)
)
elif recordset_type == "NS":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_ns_record,
)
)
elif recordset_type == "PTR":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_ptr_record,
)
)
elif recordset_type == "TXT":
matches.extend(
cfn.check_value(
recordset,
"ResourceRecords",
path[:],
check_value=self.check_txt_record,
)
)
else:
if recordset.get("TTL"):
matches.append(
RuleMatch(
path + ["TTL"], "TTL is not allowed for Alias records"
)
)
return matches
def match(self, cfn):
"""Check RecordSets and RecordSetGroups Properties"""
matches = []
recordsets = cfn.get_resources(["AWS::Route53::RecordSet"])
for name, recordset in recordsets.items():
path = ["Resources", name, "Properties"]
if isinstance(recordset, dict):
props = recordset.get("Properties")
if props:
matches.extend(self.check_recordset(path, props, cfn))
recordsetgroups = cfn.get_resource_properties(
["AWS::Route53::RecordSetGroup", "RecordSets"]
)
for recordsetgroup in recordsetgroups:
path = recordsetgroup["Path"]
value = recordsetgroup["Value"]
if isinstance(value, list):
for index, recordset in enumerate(value):
tree = path[:] + [index]
matches.extend(self.check_recordset(tree, recordset, cfn))
return matches