404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.119.28.119: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import json

from cfnlint.rules import CloudFormationLintRule, RuleMatch


class StateMachine(CloudFormationLintRule):
    """Check State Machine Definition"""

    id = "E2532"
    shortdesc = "Check State Machine Definition for proper syntax"
    description = (
        "Check the State Machine String Definition to make sure its JSON. "
        "Validate basic syntax of the file to determine validity."
    )
    source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-stepfunctions-statemachine.html"
    tags = ["resources", "stepfunctions"]

    def __init__(self):
        """Init"""
        super().__init__()
        self.resource_property_types.append("AWS::StepFunctions::StateMachine")

    def _check_state_json(self, def_json, state_name, path):
        """Check State JSON Definition"""
        matches = []

        # https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-common-fields.html
        common_state_keys = [
            "Next",
            "End",
            "Type",
            "Comment",
            "InputPath",
            "OutputPath",
        ]
        common_state_required_keys = [
            "Type",
        ]
        state_key_types = {
            "Pass": ["Result", "ResultPath", "Parameters"],
            "Task": [
                "Credentials",
                "Resource",
                "Parameters",
                "ResultPath",
                "ResultSelector",
                "Retry",
                "Catch",
                "TimeoutSeconds",
                "TimeoutSecondsPath",
                "Parameters",
                "HeartbeatSeconds",
                "HeartbeatSecondsPath",
            ],
            "Map": [
                "MaxConcurrency",
                "Iterator",
                "ItemsPath",
                "ItemProcessor",
                "ItemSelector",
                "ResultPath",
                "ResultSelector",
                "Retry",
                "Catch",
                "Parameters",
            ],
            "Choice": ["Choices", "Default"],
            "Wait": ["Seconds", "Timestamp", "SecondsPath", "TimestampPath"],
            "Succeed": [],
            "Fail": ["Cause", "Error"],
            "Parallel": [
                "Branches",
                "ResultPath",
                "ResultSelector",
                "Parameters",
                "Retry",
                "Catch",
            ],
        }
        state_required_types = {
            "Pass": [],
            "Task": ["Resource"],
            "Choice": ["Choices"],
            "Wait": [],
            "Succeed": [],
            "Fail": [],
            "Parallel": ["Branches"],
        }

        for req_key in common_state_required_keys:
            if req_key not in def_json:
                message = f"State Machine Definition required key ({req_key}) for State ({state_name}) is missing"
                matches.append(RuleMatch(path, message))
                return matches

        state_type = def_json.get("Type")

        if state_type in state_key_types:
            for state_key, _ in def_json.items():
                if state_key not in common_state_keys + state_key_types.get(
                    state_type, []
                ):
                    message = f"State Machine Definition key ({state_key}) for State ({state_name}) of Type ({state_type}) is not valid"
                    matches.append(RuleMatch(path, message))
            for req_key in common_state_required_keys + state_required_types.get(
                state_type, []
            ):
                if req_key not in def_json:
                    message = f"State Machine Definition required key ({req_key}) for State ({state_name}) of Type ({state_type}) is missing"
                    matches.append(RuleMatch(path, message))
                    return matches
        else:
            message = f"State Machine Definition Type ({state_type}) is not valid"
            matches.append(RuleMatch(path, message))

        return matches

    def _check_definition_json(self, def_json, path):
        """Check JSON Definition"""
        matches = []

        top_level_keys = ["Comment", "StartAt", "TimeoutSeconds", "Version", "States"]
        top_level_required_keys = ["StartAt", "States"]
        for top_key, _ in def_json.items():
            if top_key not in top_level_keys:
                message = f"State Machine Definition key ({top_key}) is not valid"
                matches.append(RuleMatch(path, message))

        for req_key in top_level_required_keys:
            if req_key not in def_json:
                message = (
                    f"State Machine Definition required key ({req_key}) is missing"
                )
                matches.append(RuleMatch(path, message))

        for state_name, state_value in def_json.get("States", {}).items():
            matches.extend(self._check_state_json(state_value, state_name, path))
        return matches

    def check_value(self, value, path, fail_on_loads=True):
        """Check Definition Value"""
        matches = []
        try:
            def_json = json.loads(value)
        # pylint: disable=W0703
        except Exception as err:
            if fail_on_loads:
                message = f"State Machine Definition needs to be formatted as JSON. Error {err}"
                matches.append(RuleMatch(path, message))
                return matches

            self.logger.debug("State Machine definition could not be parsed. Skipping")
            return matches

        matches.extend(self._check_definition_json(def_json, path))
        return matches

    def check_sub(self, value, path):
        """Check Sub Object"""
        matches = []
        if isinstance(value, list):
            matches.extend(self.check_value(value[0], path, False))
        elif isinstance(value, str):
            matches.extend(self.check_value(value, path, False))

        return matches

    def match_resource_properties(self, properties, _, path, cfn):
        """Check CloudFormation Properties"""
        matches = []

        matches.extend(
            cfn.check_value(
                obj=properties,
                key="DefinitionString",
                path=path[:],
                check_value=self.check_value,
                check_sub=self.check_sub,
            )
        )

        return matches

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
StateMachine.py File 6.36 KB 0644
__init__.py File 106 B 0644