404

[ Avaa Bypassed ]




Upload:

Command:

botdev@13.58.146.48: ~ $
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import json
from datetime import date

from cfnlint.helpers import FUNCTIONS_SINGLE, convert_dict
from cfnlint.rules import CloudFormationLintRule, RuleMatch


class Policy(CloudFormationLintRule):
    """Check if IAM Policy JSON is correct"""

    id = "E2507"
    shortdesc = "Check if IAM Policies are properly configured"
    description = "See if there elements inside an IAM policy are correct"
    source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-iam-policy.html"
    tags = ["properties", "iam"]

    def __init__(self):
        """Init"""
        super().__init__()
        self.resource_exceptions = {
            "AWS::ECR::Repository": "RepositoryPolicyText",
        }
        self.resources_and_keys = {
            "AWS::ECR::Repository": "RepositoryPolicyText",
            "AWS::Elasticsearch::Domain": "AccessPolicies",
            "AWS::OpenSearchService::Domain": "AccessPolicies",
            "AWS::KMS::Key": "KeyPolicy",
            "AWS::S3::BucketPolicy": "PolicyDocument",
            "AWS::SNS::TopicPolicy": "PolicyDocument",
            "AWS::SQS::QueuePolicy": "PolicyDocument",
        }
        self.idp_and_keys = {
            "AWS::IAM::Group": "Policies",
            "AWS::IAM::ManagedPolicy": "PolicyDocument",
            "AWS::IAM::Policy": "PolicyDocument",
            "AWS::IAM::Role": "Policies",
            "AWS::IAM::User": "Policies",
            "AWS::SSO::PermissionSet": "InlinePolicy",
        }
        for resource_type in self.resources_and_keys:
            self.resource_property_types.append(resource_type)
        for resource_type in self.idp_and_keys:
            self.resource_property_types.append(resource_type)

    def check_policy_document(
        self, value, path, is_identity_policy, resource_exceptions, start_mark, end_mark
    ):
        """Check policy document"""
        matches = []

        valid_keys = [
            "Version",
            "Id",
            "Statement",
        ]
        valid_versions = [
            "2012-10-17",
            "2008-10-17",
            date(2012, 10, 17),
            date(2008, 10, 17),
        ]

        if isinstance(value, str):
            try:
                value = convert_dict(json.loads(value), start_mark, end_mark)
            except Exception as ex:  # pylint: disable=W0703,W0612
                message = "IAM Policy Documents need to be JSON"
                matches.append(RuleMatch(path[:], message))
                return matches

        if not isinstance(value, dict):
            message = "IAM Policy Documents needs to be JSON"
            matches.append(RuleMatch(path[:], message))
            return matches

        for p_vs, p_p in value.items_safe(path[:], (dict)):
            for parent_key, parent_value in p_vs.items():
                if parent_key not in valid_keys:
                    message = f"IAM Policy key {parent_key} doesn't exist."
                    matches.append(RuleMatch(path[:] + p_p + [parent_key], message))
                if parent_key == "Version":
                    if parent_value not in valid_versions:
                        message = f'IAM Policy Version needs to be one of ({", ".join(map(str, ["2012-10-17", "2008-10-17"]))}).'
                        matches.append(RuleMatch(p_p + [parent_key], message))
                if parent_key == "Statement":
                    if isinstance(parent_value, list):
                        for i_s_v, i_s_p in parent_value.items_safe(
                            p_p + ["Statement"], (dict)
                        ):
                            matches.extend(
                                self._check_policy_statement(
                                    i_s_p,
                                    i_s_v,
                                    is_identity_policy,
                                    resource_exceptions,
                                )
                            )
                    elif isinstance(parent_value, dict):
                        for i_s_v, i_s_p in parent_value.items_safe(
                            p_p + ["Statement"]
                        ):
                            matches.extend(
                                self._check_policy_statement(
                                    i_s_p,
                                    i_s_v,
                                    is_identity_policy,
                                    resource_exceptions,
                                )
                            )
                    else:
                        message = "IAM Policy statement should be of list."
                        matches.append(RuleMatch(p_p + [parent_key], message))
        return matches

    def _check_policy_statement(
        self, branch, statement, is_identity_policy, resource_exceptions
    ):
        """Check statements"""
        matches = []
        statement_valid_keys = [
            "Action",
            "Condition",
            "Effect",
            "NotAction",
            "NotPrincipal",
            "NotResource",
            "Principal",
            "Resource",
            "Sid",
        ]

        for key, _ in statement.items():
            if key not in statement_valid_keys:
                message = f"IAM Policy statement key {key} isn't valid"
                matches.append(RuleMatch(branch[:] + [key], message))
        if "Effect" not in statement:
            message = "IAM Policy statement missing Effect"
            matches.append(RuleMatch(branch[:], message))
        else:
            for effect, effect_path in statement.get_safe("Effect"):
                if isinstance(effect, str):
                    if effect not in ["Allow", "Deny"]:
                        message = "IAM Policy Effect should be Allow or Deny"
                        matches.append(RuleMatch(branch[:] + effect_path, message))
        if "Action" not in statement and "NotAction" not in statement:
            message = "IAM Policy statement missing Action or NotAction"
            matches.append(RuleMatch(branch[:], message))
        if is_identity_policy:
            if "Principal" in statement or "NotPrincipal" in statement:
                message = "IAM Resource Policy statement shouldn't have Principal or NotPrincipal"
                matches.append(RuleMatch(branch[:], message))
        else:
            if "Principal" not in statement and "NotPrincipal" not in statement:
                message = "IAM Resource Policy statement should have Principal or NotPrincipal"
                matches.append(RuleMatch(branch[:] + ["Principal"], message))
        if not resource_exceptions:
            if "Resource" not in statement and "NotResource" not in statement:
                message = "IAM Policy statement missing Resource or NotResource"
                matches.append(RuleMatch(branch[:], message))

        resources = statement.get("Resource", [])
        if isinstance(resources, str):
            resources = [resources]

        for index, resource in enumerate(resources):
            if isinstance(resource, dict):
                if len(resource) == 1:
                    for k in resource.keys():
                        if k not in FUNCTIONS_SINGLE:
                            message = (
                                "IAM Policy statement Resource incorrectly formatted"
                            )
                            matches.append(
                                RuleMatch(branch[:] + ["Resource", index], message)
                            )
                else:
                    message = "IAM Policy statement Resource incorrectly formatted"
                    matches.append(RuleMatch(branch[:] + ["Resource", index], message))

        return matches

    def match_resource_properties(self, properties, resourcetype, path, cfn):
        """Check CloudFormation Properties"""
        matches = []

        is_identity_policy = True
        if resourcetype in self.resources_and_keys:
            is_identity_policy = False

        key = None
        if resourcetype in self.resources_and_keys:
            key = self.resources_and_keys.get(resourcetype)
        else:
            key = self.idp_and_keys.get(resourcetype)

        if not key:
            # Key isn't defined return nothing
            return matches

        resource_exceptions = False
        if key == self.resource_exceptions.get(resourcetype):
            resource_exceptions = True

        other_keys = []
        for key, value in self.resources_and_keys.items():
            if value != "Policies":
                other_keys.append(key)
        for key, value in self.idp_and_keys.items():
            if value != "Policies":
                other_keys.append(key)

        for key, value in properties.items():
            if key == "Policies" and isinstance(value, list):
                for index, policy in enumerate(properties.get(key, [])):
                    matches.extend(
                        cfn.check_value(
                            obj=policy,
                            key="PolicyDocument",
                            path=path[:] + ["Policies", index],
                            check_value=self.check_policy_document,
                            is_identity_policy=is_identity_policy,
                            resource_exceptions=resource_exceptions,
                            start_mark=key.start_mark,
                            end_mark=key.end_mark,
                        )
                    )
            elif key in [
                "KeyPolicy",
                "PolicyDocument",
                "RepositoryPolicyText",
                "AccessPolicies",
                "InlinePolicy",
            ]:
                matches.extend(
                    cfn.check_value(
                        obj=properties,
                        key=key,
                        path=path[:],
                        check_value=self.check_policy_document,
                        is_identity_policy=is_identity_policy,
                        resource_exceptions=resource_exceptions,
                        start_mark=key.start_mark,
                        end_mark=key.end_mark,
                    )
                )

        return matches

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
Permissions.py File 7.78 KB 0644
Policy.py File 10.15 KB 0644
PolicyVersion.py File 3.26 KB 0644
RefWithPath.py File 1.81 KB 0644
__init__.py File 106 B 0644