404

[ Avaa Bypassed ]




Upload:

Command:

botdev@18.218.242.255: ~ $
"""
Helpers for loading resources, managing specs, constants, etc.

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import logging
import warnings
from typing import Any, List

import networkx
import regex as re

LOGGER = logging.getLogger("cfnlint.graph")


class EdgeSetting:
    color: str = "black"
    labal: str = ""

    def __init__(self, color: str, label: str) -> None:
        self.color = color
        self.label = label


class NodeSetting:
    color: str = "black"
    shape: str
    node_type: str

    def __init__(self, color: str, node_type: str, shape: str = "ellipse") -> None:
        self.color = color
        self.shape = shape
        self.node_type = node_type


class GraphSettings:
    ref: EdgeSetting
    getatt: EdgeSetting
    depends_on: EdgeSetting
    resource: NodeSetting
    parameter: NodeSetting
    output: NodeSetting

    def subgraph_view(self, graph) -> networkx.MultiDiGraph:
        view = networkx.MultiDiGraph(name="template")
        resources: List[str] = [
            n for n, v in graph.nodes.items() if v["type"] in ["Resource"]
        ]
        view.add_nodes_from((n, graph.nodes[n]) for n in resources)
        view.add_edges_from(
            (n, nbr, key, d)
            for n, nbrs in graph.adj.items()
            if n in resources
            for nbr, keydict in nbrs.items()
            if nbr in resources
            for key, d in keydict.items()
        )
        view.graph.update(graph.graph)
        return view


class DefaultGraphSettings(GraphSettings):
    ref = EdgeSetting(color="black", label="Ref")
    getatt = EdgeSetting(color="black", label="GetAtt")
    depends_on = EdgeSetting(color="black", label="DependsOn")
    resource = NodeSetting(color="black", node_type="Resource")
    parameter = NodeSetting(color="black", node_type="Parameter", shape="box")
    output = NodeSetting(color="black", node_type="Output", shape="box")


class Graph:
    """Models a template as a directed graph of resources"""

    settings: GraphSettings
    __supported_types: List[str] = ["Resources", "Parameters", "Outputs"]

    def __init__(self, cfn):
        """Builds a graph where resources are nodes and edges are explicit (DependsOn) or implicit (Fn::GetAtt, Fn::Sub, Ref)
        relationships between resources"""

        self.settings = DefaultGraphSettings()

        # Directed graph that allows self loops and parallel edges
        self.graph = networkx.MultiDiGraph(name="template")

        self._add_resources(cfn)
        self._add_parameters(cfn)
        self._add_outputs(cfn)
        self._add_refs(cfn)
        self._add_getatts(cfn)
        self._add_subs(cfn)

    def get_cycles(self, cfn):
        """Return all resource pairs that have a cycle in them"""
        result = []
        for starting_resource in cfn.template.get("Resources", {}):
            try:
                for edge in list(networkx.find_cycle(self.graph, starting_resource)):
                    if edge not in result:
                        result.append(edge)
            except networkx.NetworkXNoCycle:
                continue
        return result

    def _add_parameters(self, cfn: Any) -> None:
        # add all parameters in the template as nodes
        for parameter_id, parameter_values in cfn.template.get(
            "Parameters", {}
        ).items():
            if not isinstance(parameter_values, dict):
                continue
            type_val = parameter_values.get("Type", "")
            if not isinstance(type_val, str):
                continue
            graph_label = str.format(f'"{parameter_id}\\n<{type_val}>"')
            self._add_node(
                parameter_id, label=graph_label, settings=self.settings.parameter
            )

    def _add_outputs(self, cfn: Any) -> None:
        # add all outputs in the template as nodes
        for output_id in cfn.template.get("Outputs", {}).keys():
            graph_label = str.format(f'"{output_id}"')
            self._add_node(output_id, label=graph_label, settings=self.settings.output)

    def _add_resources(self, cfn: Any):
        # add all resources in the template as nodes
        for resourceId, resourceVals in cfn.template.get("Resources", {}).items():
            if not isinstance(resourceVals, dict):
                continue
            type_val = resourceVals.get("Type", "")
            if not isinstance(type_val, str):
                continue
            graph_label = str.format(f'"{resourceId}\\n<{type_val}>"')
            self._add_node(
                resourceId, label=graph_label, settings=self.settings.resource
            )
            target_ids = resourceVals.get("DependsOn", [])
            if isinstance(target_ids, (list, str)):
                if isinstance(target_ids, (str)):
                    target_ids = [target_ids]
                for target_id in target_ids:
                    if isinstance(target_id, str):
                        if self._is_resource(cfn, target_id):
                            self._add_edge(
                                resourceId,
                                target_id,
                                ["DependsOn"],
                                self.settings.depends_on,
                            )

    def _add_refs(self, cfn: Any) -> None:
        # add edges for "Ref" tags. { "Ref" : "logicalNameOfResource" }
        refs_paths = cfn.search_deep_keys("Ref")
        for ref_path in refs_paths:
            ref_type, source_id = ref_path[:2]
            source_path = ref_path[2:-2]
            target_id = ref_path[-1]
            if not ref_type in self.__supported_types:
                continue

            if ref_type in ["Parameters", "Outputs"]:
                source_id = f"{ref_type[:-1]}-{source_id}"

            if isinstance(target_id, (str, int)) and (
                self._is_resource(cfn, target_id)
            ):
                self._add_edge(source_id, target_id, source_path, self.settings.ref)

    def _add_getatts(self, cfn: Any) -> None:
        # add edges for "Fn::GetAtt" tags.
        # { "Fn::GetAtt" : [ "logicalNameOfResource", "attributeName" ] } or { "!GetAtt" : "logicalNameOfResource.attributeName" }
        getatt_paths = cfn.search_deep_keys("Fn::GetAtt")
        for getatt_path in getatt_paths:
            ref_type, source_id = getatt_path[:2]
            source_path = getatt_path[2:-2]
            value = getatt_path[-1]
            if not ref_type in self.__supported_types:
                continue

            if ref_type in ["Parameters", "Outputs"]:
                source_id = f"{ref_type[:-1]}-{source_id}"

            if (
                isinstance(value, list)
                and len(value) == 2
                and (self._is_resource(cfn, value[0]))
            ):
                target_resource_id = value[0]
                self._add_edge(
                    source_id, target_resource_id, source_path, self.settings.getatt
                )

            if isinstance(value, (str, str)) and "." in value:
                target_resource_id = value.split(".")[0]
                if self._is_resource(cfn, target_resource_id):
                    self._add_edge(
                        source_id, target_resource_id, source_path, self.settings.getatt
                    )

    def _add_subs(self, cfn: Any) -> None:
        # add edges for "Fn::Sub" tags. E.g. { "Fn::Sub": "arn:aws:ec2:${AWS::Region}:${AWS::AccountId}:vpc/${vpc}" }
        sub_objs = cfn.search_deep_keys("Fn::Sub")
        for sub_obj in sub_objs:
            sub_parameters = []
            sub_parameter_values = {}
            value = sub_obj[-1]
            source_path = sub_obj[2:-2]
            ref_type, source_id = sub_obj[:2]

            if not ref_type in self.__supported_types:
                continue

            if ref_type in ["Parameters", "Outputs"]:
                source_id = f"{ref_type[:-1]}-{source_id}"

            if isinstance(value, list):
                if not value:
                    continue
                if len(value) == 2:
                    sub_parameter_values = value[1]
                sub_parameters = self._find_parameter(value[0])
            elif isinstance(value, (str)):
                sub_parameters = self._find_parameter(value)

            for sub_parameter in sub_parameters:
                if sub_parameter not in sub_parameter_values:
                    if "." in sub_parameter:
                        target_id = sub_parameter.split(".")[0]
                        if self._is_resource(cfn, target_id):
                            self._add_edge(
                                source_id, target_id, source_path, self.settings.getatt
                            )
                    elif self._is_resource(cfn, sub_parameter):
                        self._add_edge(
                            source_id, sub_parameter, source_path, self.settings.ref
                        )

    def _add_node(self, node_id, label, settings):
        if settings.node_type in ["Parameter", "Output"]:
            node_id = f"{settings.node_type}-{node_id}"

        self.graph.add_node(
            node_id,
            label=label,
            color=settings.color,
            shape=settings.shape,
            type=settings.node_type,
        )

    def _add_edge(self, source_id, target_id, source_path, settings):
        self.graph.add_edge(
            source_id,
            target_id,
            source_paths=source_path,
            label=settings.label,
            color=settings.color,
        )

    def _is_resource(self, cfn, identifier):
        """Check if the identifier is that of a Resource"""
        return cfn.template.get("Resources", {}).get(identifier, {})

    def _find_parameter(self, string):
        """Search string for tokenized fields"""
        regex = re.compile(r"\${([a-zA-Z0-9.]*)}")
        if isinstance(string, str):
            return regex.findall(string)

        return []

    # pylint: disable=import-outside-toplevel,unused-variable
    def to_dot(self, path):
        """Export the graph to a file with DOT format"""
        view = self.settings.subgraph_view(self.graph)
        try:
            import pygraphviz  # pylint: disable=unused-import

            networkx.drawing.nx_agraph.write_dot(view, path)
        except ImportError:
            try:
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore", category=PendingDeprecationWarning)
                    warnings.simplefilter("ignore", category=DeprecationWarning)
                    import pydot  # pylint: disable=unused-import

                    networkx.drawing.nx_pydot.write_dot(view, path)
            except ImportError as e:
                raise e

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
conditions Folder 0755
data Folder 0755
decode Folder 0755
decorators Folder 0755
formatters Folder 0755
rules Folder 0755
template Folder 0755
__init__.py File 2.47 KB 0644
__main__.py File 1.78 KB 0644
api.py File 1.59 KB 0644
config.py File 26.56 KB 0644
core.py File 11.02 KB 0644
exceptions.py File 592 B 0644
graph.py File 10.85 KB 0644
helpers.py File 21.95 KB 0644
languageExtensions.py File 1.98 KB 0644
maintenance.py File 18.87 KB 0644
runner.py File 3.93 KB 0644
transform.py File 8.48 KB 0644
version.py File 130 B 0644