404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.16.139.76: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

import regex as re

from cfnlint.helpers import REGEX_ALPHANUMERIC, REGEX_IPV4, REGEX_IPV6
from cfnlint.rules import CloudFormationLintRule, RuleMatch


class RecordSet(CloudFormationLintRule):
    """Check Route53 Recordset Configuration"""

    id = "E3020"
    shortdesc = "Validate Route53 RecordSets"
    description = "Check if all RecordSets are correctly configured"
    source_url = "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/ResourceRecordTypes.html"
    tags = ["resources", "route53", "record_set"]

    # Regex generated from https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DomainNameFormat.html
    REGEX_DOMAINNAME = re.compile(
        r"^[a-zA-Z0-9\!\"\#\$\%\&\'\(\)\*\+\,-\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~\.]+$"
    )
    REGEX_TXT = re.compile(r'^("[^"]{1,255}" *)*"[^"]{1,255}"$')
    REGEX_CNAME_VALIDATIONS = re.compile(r"^.*\.acm-validations\.aws\.?$")

    def count_c_names(self, records, path, cfn):
        """Count C Names"""
        matches = []

        scenarios = cfn.get_object_without_nested_conditions(records, path)
        for scenario in scenarios:
            if len(scenario.get("Object")) > 1:
                scenario = scenario.get("Scenario")
                message = "A CNAME recordset can only contain 1 value"
                if scenario is None:
                    message = "A CNAME recordset can only contain 1 value"
                    matches.append(
                        RuleMatch(path, message.format("/".join(map(str, message))))
                    )
                else:
                    message = "A CNAME recordset can only contain 1 value {0} at {1}"
                    scenario_text = " and ".join(
                        [f'when condition "{k}" is {v}' for (k, v) in scenario.items()]
                    )
                    matches.append(
                        RuleMatch(
                            path,
                            message.format(scenario_text, "/".join(map(str, path))),
                        )
                    )

        return matches

    def check_record(self, value, path, record_type, regex, regex_name):
        matches = []
        if isinstance(value, str):
            if not re.match(regex, value):
                message = record_type + " record ({}) is not a valid " + regex_name
                matches.append(RuleMatch(path, message.format(value)))
        return matches

    def check_a_record(self, value, path):
        return self.check_record(value, path, "A", REGEX_IPV4, "IPv4 address")

    def check_aaaa_record(self, value, path):
        return self.check_record(value, path, "AAAA", REGEX_IPV6, "IPv6 address")

    def check_caa_record(self, value, path):
        """Check CAA record Configuration"""
        matches = []

        if isinstance(value, str):
            # Split the record up to the mandatory settings (flags tag "value")
            items = value.split(" ", 2)
            # Check if the 3 settings are given.
            if len(items) != 3:
                message = 'CAA record must contain 3 settings (flags tag "value"), record contains {} settings.'
                matches.append(RuleMatch(path, message.format(len(items))))
            else:
                # Check the flag value
                if not items[0].isdigit():
                    message = "CAA record flag setting ({}) should be of type Integer."
                    extra_args = {
                        "actual_type": type(items[0]).__name__,
                        "expected_type": int.__name__,
                    }
                    matches.append(
                        RuleMatch(path, message.format(items[0]), **extra_args)
                    )
                else:
                    if int(items[0]) not in [0, 128]:
                        message = "Invalid CAA record flag setting ({}) given, must be 0 or 128."
                        matches.append(RuleMatch(path, message.format(items[0])))

                # Check the tag value
                if not re.match(REGEX_ALPHANUMERIC, items[1]):
                    message = "Invalid CAA record tag setting {}. Value has to be alphanumeric."
                    matches.append(RuleMatch(path, message.format(items[1])))

                # Check the value
                if not items[2].startswith('"') or not items[2].endswith('"'):
                    message = 'CAA record value setting has to be enclosed in double quotation marks (").'
                    matches.append(RuleMatch(path, message))

        return matches

    def check_cname_record(self, value, path):
        """Check CNAME record Configuration"""
        matches = []

        if not isinstance(value, dict):
            if not re.match(self.REGEX_DOMAINNAME, value) and not re.match(
                self.REGEX_CNAME_VALIDATIONS, value
            ):
                # ACM Route 53 validation uses invalid CNAMEs starting with `_`,
                # special-case them rather than complicate the regex.
                message = "CNAME record ({}) does not contain a valid domain name"
                matches.append(RuleMatch(path, message.format(value)))

        return matches

    def check_mx_record(self, value, path):
        """Check MX record Configuration"""
        matches = []

        if isinstance(value, str):
            # Split the record up to the mandatory settings (priority domainname)
            items = value.split(" ")

            # Check if the 3 settings are given.
            if len(items) != 2:
                message = "MX record must contain 2 settings (priority domainname), record contains {} settings."
                matches.append(RuleMatch(path, message.format(len(items), value)))
            else:
                # Check the priority value
                if not items[0].isdigit():
                    message = (
                        "MX record priority setting ({}) should be of type Integer."
                    )
                    extra_args = {
                        "actual_type": type(items[0]).__name__,
                        "expected_type": int.__name__,
                    }
                    matches.append(
                        RuleMatch(path, message.format(items[0], value), **extra_args)
                    )
                else:
                    if not 0 <= int(items[0]) <= 65535:
                        message = "Invalid MX record priority setting ({}) given, must be between 0 and 65535."
                        matches.append(RuleMatch(path, message.format(items[0], value)))

                # Check the domainname value
                if not re.match(self.REGEX_DOMAINNAME, items[1]):
                    matches.append(RuleMatch(path, message.format(items[1])))

        return matches

    def check_ns_record(self, value, path):
        return self.check_record(
            value, path, "NS", self.REGEX_DOMAINNAME, "domain name"
        )

    def check_ptr_record(self, value, path):
        return self.check_record(
            value, path, "PTR", self.REGEX_DOMAINNAME, "domain name"
        )

    def check_txt_record(self, value, path):
        """Check TXT record Configuration"""
        matches = []

        if not isinstance(value, dict) and not re.match(self.REGEX_TXT, value):
            message = (
                "TXT record is not structured as one or more items up to 255 characters "
                "enclosed in double quotation marks at {0}"
            )
            matches.append(
                RuleMatch(
                    path,
                    (message.format("/".join(map(str, path)))),
                )
            )

        return matches

    def check_recordset(self, path, recordset, cfn):
        """Check record configuration"""

        matches = []
        recordset_type = recordset.get("Type")

        # Skip Intrinsic functions
        if not isinstance(recordset_type, dict):
            if not recordset.get("AliasTarget"):
                # If no Alias is specified, ResourceRecords has to be specified
                if not recordset.get("ResourceRecords"):
                    return matches
                # Record type specific checks
                if recordset_type == "A":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_a_record,
                        )
                    )
                elif recordset_type == "AAAA":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_aaaa_record,
                        )
                    )
                elif recordset_type == "CAA":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_caa_record,
                        )
                    )
                elif recordset_type == "CNAME":
                    matches.extend(
                        self.count_c_names(
                            recordset.get("ResourceRecords"),
                            path[:] + ["ResourceRecords"],
                            cfn,
                        )
                    )
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_cname_record,
                        )
                    )
                elif recordset_type == "MX":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_mx_record,
                        )
                    )
                elif recordset_type == "NS":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_ns_record,
                        )
                    )
                elif recordset_type == "PTR":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_ptr_record,
                        )
                    )
                elif recordset_type == "TXT":
                    matches.extend(
                        cfn.check_value(
                            recordset,
                            "ResourceRecords",
                            path[:],
                            check_value=self.check_txt_record,
                        )
                    )
            else:
                if recordset.get("TTL"):
                    matches.append(
                        RuleMatch(
                            path + ["TTL"], "TTL is not allowed for Alias records"
                        )
                    )

        return matches

    def match(self, cfn):
        """Check RecordSets and RecordSetGroups Properties"""

        matches = []

        recordsets = cfn.get_resources(["AWS::Route53::RecordSet"])

        for name, recordset in recordsets.items():
            path = ["Resources", name, "Properties"]

            if isinstance(recordset, dict):
                props = recordset.get("Properties")
                if props:
                    matches.extend(self.check_recordset(path, props, cfn))

        recordsetgroups = cfn.get_resource_properties(
            ["AWS::Route53::RecordSetGroup", "RecordSets"]
        )

        for recordsetgroup in recordsetgroups:
            path = recordsetgroup["Path"]
            value = recordsetgroup["Value"]
            if isinstance(value, list):
                for index, recordset in enumerate(value):
                    tree = path[:] + [index]
                    matches.extend(self.check_recordset(tree, recordset, cfn))

        return matches

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
RecordSet.py File 12.41 KB 0644
RecordSetName.py File 3.6 KB 0644
__init__.py File 106 B 0644