404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.19.218.250: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.rules import CloudFormationLintRule, RuleMatch


class RuleScheduleExpression(CloudFormationLintRule):
    """Validate AWS Events Schedule expression format"""

    id = "E3027"
    shortdesc = "Validate AWS Event ScheduleExpression format"
    description = "Validate the formation of the AWS::Event ScheduleExpression"
    source_url = "https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html"
    tags = ["resources", "events"]

    def initialize(self, cfn):
        """Initialize the rule"""
        self.resource_property_types = ["AWS::Events::Rule"]

    def check_rate(self, value, path):
        """Check Rate configuration"""
        matches = []
        # Extract the expression from rate(XXX)
        rate_expression = value[value.find("(") + 1 : value.find(")")]

        if not rate_expression:
            matches.append(
                RuleMatch(path, "Rate value of ScheduleExpression cannot be empty")
            )
        else:
            # Rate format: rate(Value Unit)
            items = rate_expression.split(" ")

            if len(items) != 2:
                message = "Rate expression must contain 2 elements (Value Unit), rate contains {} elements"
                matches.append(RuleMatch(path, message.format(len(items))))
            else:
                # Check the Value
                if not items[0].isdigit():
                    message = "Rate Value ({}) should be of type Integer."
                    extra_args = {
                        "actual_type": type(items[0]).__name__,
                        "expected_type": int.__name__,
                    }
                    matches.append(
                        RuleMatch(path, message.format(items[0]), **extra_args)
                    )

        return matches

    def check_cron(self, value, path):
        """Check Cron configuration"""
        matches = []
        # Extract the expression from cron(XXX)
        cron_expression = value[value.find("(") + 1 : value.find(")")]

        if not cron_expression:
            matches.append(
                RuleMatch(path, "Cron value of ScheduleExpression cannot be empty")
            )
        else:
            # Rate format: cron(Minutes Hours Day-of-month Month Day-of-week Year)
            items = cron_expression.split(" ")

            if len(items) != 6:
                message = "Cron expression must contain 6 elements (Minutes Hours Day-of-month Month Day-of-week Year), cron contains {} elements"
                matches.append(RuleMatch(path, message.format(len(items))))
                return matches

            _, _, day_of_month, _, day_of_week, _ = cron_expression.split(" ")
            if day_of_month != "?" and day_of_week != "?":
                matches.append(
                    RuleMatch(
                        path,
                        "Don't specify the Day-of-month and Day-of-week fields in the same cron expression",
                    )
                )

        return matches

    def check_value(self, value, path):
        """Count ScheduledExpression value"""
        matches = []

        # Value is either "cron()" or "rate()"
        if value.startswith("rate(") and value.endswith(")"):
            matches.extend(self.check_rate(value, path))
        elif value.startswith("cron(") and value.endswith(")"):
            matches.extend(self.check_cron(value, path))
        else:
            message = "Invalid ScheduledExpression specified ({}). Value has to be either cron() or rate()"
            matches.append(RuleMatch(path, message.format(value)))

        return matches

    def match_resource_properties(self, properties, _, path, cfn):
        """Check CloudFormation Properties"""
        matches = []

        matches.extend(
            cfn.check_value(
                obj=properties,
                key="ScheduleExpression",
                path=path[:],
                check_value=self.check_value,
            )
        )

        return matches

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
RuleScheduleExpression.py File 4.02 KB 0644
RuleTargetsLimit.py File 1.81 KB 0644
__init__.py File 106 B 0644