404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.149.253.148: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import json

from cfnlint.data import AdditionalSpecs
from cfnlint.helpers import convert_dict, load_resource
from cfnlint.rules import CloudFormationLintRule, RuleMatch


class Permissions(CloudFormationLintRule):
    """Check IAM Permission configuration"""

    id = "W3037"
    shortdesc = "Check IAM Permission configuration"
    description = "Check for valid IAM Permissions"
    source_url = "https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_action.html"
    tags = ["properties", "iam", "permissions"]
    experimental = True

    IAM_PERMISSION_RESOURCE_TYPES = {
        "AWS::Elasticsearch::Domain": "AccessPolicies",
        "AWS::OpenSearchService::Domain": "AccessPolicies",
        "AWS::IAM::Group": "Policies",
        "AWS::IAM::ManagedPolicy": "PolicyDocument",
        "AWS::IAM::Policy": "PolicyDocument",
        "AWS::IAM::Role": "Policies",
        "AWS::IAM::User": "Policies",
        "AWS::KMS::Key": "KeyPolicy",
        "AWS::S3::BucketPolicy": "PolicyDocument",
        "AWS::SNS::TopicPolicy": "PolicyDocument",
        "AWS::SQS::QueuePolicy": "PolicyDocument",
    }

    def __init__(self):
        """Init"""
        super().__init__()
        self.service_map = self.load_service_map()
        for resource_type in self.IAM_PERMISSION_RESOURCE_TYPES:
            self.resource_property_types.append(resource_type)

    def load_service_map(self):
        """
        Convert policies.json into a simpler version for more efficient key lookup.
        """
        service_map = load_resource(AdditionalSpecs, "Policies.json")["serviceMap"]

        policy_service_map = {}

        for _, properties in service_map.items():
            # The services and actions are case insensitive
            service = properties["StringPrefix"].lower()
            actions = [x.lower() for x in properties["Actions"]]

            # Some services have the same name for different generations; like elasticloadbalancing.
            if service in policy_service_map:
                policy_service_map[service] += actions
            else:
                policy_service_map[service] = actions

        return policy_service_map

    def check_policy_document(self, value, path, start_mark, end_mark):
        """Check policy document"""
        matches = []

        if isinstance(value, str):
            try:
                value = convert_dict(json.loads(value), start_mark, end_mark)
            except Exception as ex:  # pylint: disable=W0703,W0612
                message = "IAM Policy Documents need to be JSON"
                matches.append(RuleMatch(path[:], message))
                return matches

        if not isinstance(value, dict):
            return matches

        for p_vs, p_p in value.items_safe(path[:], (dict)):
            statements = p_vs.get("Statement")
            if statements:
                if isinstance(statements, dict):
                    statements = [statements]
                if isinstance(statements, list):
                    for index, statement in enumerate(statements):
                        actions = []

                        if isinstance(statement, dict):
                            actions.extend(self.get_actions(statement))

                        elif isinstance(statement, list):
                            for effective_permission in statement:
                                actions.extend(self.get_actions(effective_permission))

                        for action in actions:
                            matches.extend(
                                self.check_permissions(
                                    action, p_p + ["Statement", index]
                                )
                            )

        return matches

    def check_permissions(self, action, path):
        """
        Check if permission is valid
        """
        matches = []
        if action == "*":
            return matches

        service, permission = action.split(":", 1)
        # Get lowercase so we can check case insenstive. Keep the original values for the message
        service_value = service.lower()
        permission_value = permission.lower()

        if service_value in self.service_map:
            if permission_value == "*":
                pass
            elif permission_value.endswith("*"):
                wilcarded_permission = permission_value.split("*")[0]
                if not any(
                    wilcarded_permission in action
                    for action in self.service_map[service_value]
                ):
                    message = 'Invalid permission "{}" for "{}" found in permissions'
                    matches.append(RuleMatch(path, message.format(permission, service)))

            elif permission_value.startswith("*"):
                wilcarded_permission = permission_value.split("*")[1]
                if not any(
                    wilcarded_permission in action
                    for action in self.service_map[service_value]
                ):
                    message = 'Invalid permission "{}" for "{}" found in permissions'
                    matches.append(RuleMatch(path, message.format(permission, service)))
            elif permission_value not in self.service_map[service_value]:
                message = 'Invalid permission "{}" for "{}" found in permissions'
                matches.append(RuleMatch(path, message.format(permission, service)))
        else:
            message = 'Invalid service "{}" found in permissions'
            matches.append(RuleMatch(path, message.format(service)))

        return matches

    def get_actions(self, effective_permissions):
        """return all actions from a statement"""

        actions = []

        if "Action" in effective_permissions:
            if isinstance(effective_permissions.get("Action"), str):
                actions.append(effective_permissions.get("Action"))
            elif isinstance(effective_permissions.get("Action"), list):
                actions.extend(effective_permissions.get("Action"))

        if "NotAction" in effective_permissions:
            if isinstance(effective_permissions.get("NotAction"), str):
                actions.append(effective_permissions.get("NotAction"))
            elif isinstance(effective_permissions.get("Action"), list):
                actions.extend(effective_permissions.get("NotAction"))

        return actions

    def match_resource_properties(self, properties, resourcetype, path, cfn):
        """Check CloudFormation Properties"""
        matches = []

        key = self.IAM_PERMISSION_RESOURCE_TYPES.get(resourcetype)
        for key, value in properties.items():
            if key == "Policies" and isinstance(value, list):
                for index, policy in enumerate(properties.get(key, [])):
                    matches.extend(
                        cfn.check_value(
                            obj=policy,
                            key="PolicyDocument",
                            path=path[:] + ["Policies", index],
                            check_value=self.check_policy_document,
                            start_mark=key.start_mark,
                            end_mark=key.end_mark,
                        )
                    )
            elif key in [
                "KeyPolicy",
                "PolicyDocument",
                "RepositoryPolicyText",
                "AccessPolicies",
            ]:
                matches.extend(
                    cfn.check_value(
                        obj=properties,
                        key=key,
                        path=path[:],
                        check_value=self.check_policy_document,
                        start_mark=key.start_mark,
                        end_mark=key.end_mark,
                    )
                )

        return matches

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
Permissions.py File 7.78 KB 0644
Policy.py File 10.15 KB 0644
PolicyVersion.py File 3.26 KB 0644
RefWithPath.py File 1.81 KB 0644
__init__.py File 106 B 0644