"""
Helpers for loading resources, managing specs, constants, etc.
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import logging
import warnings
from typing import Any, List
import networkx
import regex as re
LOGGER = logging.getLogger("cfnlint.graph")
class EdgeSetting:
color: str = "black"
labal: str = ""
def __init__(self, color: str, label: str) -> None:
self.color = color
self.label = label
class NodeSetting:
color: str = "black"
shape: str
node_type: str
def __init__(self, color: str, node_type: str, shape: str = "ellipse") -> None:
self.color = color
self.shape = shape
self.node_type = node_type
class GraphSettings:
ref: EdgeSetting
getatt: EdgeSetting
depends_on: EdgeSetting
resource: NodeSetting
parameter: NodeSetting
output: NodeSetting
def subgraph_view(self, graph) -> networkx.MultiDiGraph:
view = networkx.MultiDiGraph(name="template")
resources: List[str] = [
n for n, v in graph.nodes.items() if v["type"] in ["Resource"]
]
view.add_nodes_from((n, graph.nodes[n]) for n in resources)
view.add_edges_from(
(n, nbr, key, d)
for n, nbrs in graph.adj.items()
if n in resources
for nbr, keydict in nbrs.items()
if nbr in resources
for key, d in keydict.items()
)
view.graph.update(graph.graph)
return view
class DefaultGraphSettings(GraphSettings):
ref = EdgeSetting(color="black", label="Ref")
getatt = EdgeSetting(color="black", label="GetAtt")
depends_on = EdgeSetting(color="black", label="DependsOn")
resource = NodeSetting(color="black", node_type="Resource")
parameter = NodeSetting(color="black", node_type="Parameter", shape="box")
output = NodeSetting(color="black", node_type="Output", shape="box")
class Graph:
"""Models a template as a directed graph of resources"""
settings: GraphSettings
__supported_types: List[str] = ["Resources", "Parameters", "Outputs"]
def __init__(self, cfn):
"""Builds a graph where resources are nodes and edges are explicit (DependsOn) or implicit (Fn::GetAtt, Fn::Sub, Ref)
relationships between resources"""
self.settings = DefaultGraphSettings()
# Directed graph that allows self loops and parallel edges
self.graph = networkx.MultiDiGraph(name="template")
self._add_resources(cfn)
self._add_parameters(cfn)
self._add_outputs(cfn)
self._add_refs(cfn)
self._add_getatts(cfn)
self._add_subs(cfn)
def get_cycles(self, cfn):
"""Return all resource pairs that have a cycle in them"""
result = []
for starting_resource in cfn.template.get("Resources", {}):
try:
for edge in list(networkx.find_cycle(self.graph, starting_resource)):
if edge not in result:
result.append(edge)
except networkx.NetworkXNoCycle:
continue
return result
def _add_parameters(self, cfn: Any) -> None:
# add all parameters in the template as nodes
for parameter_id, parameter_values in cfn.template.get(
"Parameters", {}
).items():
if not isinstance(parameter_values, dict):
continue
type_val = parameter_values.get("Type", "")
if not isinstance(type_val, str):
continue
graph_label = str.format(f'"{parameter_id}\\n<{type_val}>"')
self._add_node(
parameter_id, label=graph_label, settings=self.settings.parameter
)
def _add_outputs(self, cfn: Any) -> None:
# add all outputs in the template as nodes
for output_id in cfn.template.get("Outputs", {}).keys():
graph_label = str.format(f'"{output_id}"')
self._add_node(output_id, label=graph_label, settings=self.settings.output)
def _add_resources(self, cfn: Any):
# add all resources in the template as nodes
for resourceId, resourceVals in cfn.template.get("Resources", {}).items():
if not isinstance(resourceVals, dict):
continue
type_val = resourceVals.get("Type", "")
if not isinstance(type_val, str):
continue
graph_label = str.format(f'"{resourceId}\\n<{type_val}>"')
self._add_node(
resourceId, label=graph_label, settings=self.settings.resource
)
target_ids = resourceVals.get("DependsOn", [])
if isinstance(target_ids, (list, str)):
if isinstance(target_ids, (str)):
target_ids = [target_ids]
for target_id in target_ids:
if isinstance(target_id, str):
if self._is_resource(cfn, target_id):
self._add_edge(
resourceId,
target_id,
["DependsOn"],
self.settings.depends_on,
)
def _add_refs(self, cfn: Any) -> None:
# add edges for "Ref" tags. { "Ref" : "logicalNameOfResource" }
refs_paths = cfn.search_deep_keys("Ref")
for ref_path in refs_paths:
ref_type, source_id = ref_path[:2]
source_path = ref_path[2:-2]
target_id = ref_path[-1]
if not ref_type in self.__supported_types:
continue
if ref_type in ["Parameters", "Outputs"]:
source_id = f"{ref_type[:-1]}-{source_id}"
if isinstance(target_id, (str, int)) and (
self._is_resource(cfn, target_id)
):
self._add_edge(source_id, target_id, source_path, self.settings.ref)
def _add_getatts(self, cfn: Any) -> None:
# add edges for "Fn::GetAtt" tags.
# { "Fn::GetAtt" : [ "logicalNameOfResource", "attributeName" ] } or { "!GetAtt" : "logicalNameOfResource.attributeName" }
getatt_paths = cfn.search_deep_keys("Fn::GetAtt")
for getatt_path in getatt_paths:
ref_type, source_id = getatt_path[:2]
source_path = getatt_path[2:-2]
value = getatt_path[-1]
if not ref_type in self.__supported_types:
continue
if ref_type in ["Parameters", "Outputs"]:
source_id = f"{ref_type[:-1]}-{source_id}"
if (
isinstance(value, list)
and len(value) == 2
and (self._is_resource(cfn, value[0]))
):
target_resource_id = value[0]
self._add_edge(
source_id, target_resource_id, source_path, self.settings.getatt
)
if isinstance(value, (str, str)) and "." in value:
target_resource_id = value.split(".")[0]
if self._is_resource(cfn, target_resource_id):
self._add_edge(
source_id, target_resource_id, source_path, self.settings.getatt
)
def _add_subs(self, cfn: Any) -> None:
# add edges for "Fn::Sub" tags. E.g. { "Fn::Sub": "arn:aws:ec2:${AWS::Region}:${AWS::AccountId}:vpc/${vpc}" }
sub_objs = cfn.search_deep_keys("Fn::Sub")
for sub_obj in sub_objs:
sub_parameters = []
sub_parameter_values = {}
value = sub_obj[-1]
source_path = sub_obj[2:-2]
ref_type, source_id = sub_obj[:2]
if not ref_type in self.__supported_types:
continue
if ref_type in ["Parameters", "Outputs"]:
source_id = f"{ref_type[:-1]}-{source_id}"
if isinstance(value, list):
if not value:
continue
if len(value) == 2:
sub_parameter_values = value[1]
sub_parameters = self._find_parameter(value[0])
elif isinstance(value, (str)):
sub_parameters = self._find_parameter(value)
for sub_parameter in sub_parameters:
if sub_parameter not in sub_parameter_values:
if "." in sub_parameter:
target_id = sub_parameter.split(".")[0]
if self._is_resource(cfn, target_id):
self._add_edge(
source_id, target_id, source_path, self.settings.getatt
)
elif self._is_resource(cfn, sub_parameter):
self._add_edge(
source_id, sub_parameter, source_path, self.settings.ref
)
def _add_node(self, node_id, label, settings):
if settings.node_type in ["Parameter", "Output"]:
node_id = f"{settings.node_type}-{node_id}"
self.graph.add_node(
node_id,
label=label,
color=settings.color,
shape=settings.shape,
type=settings.node_type,
)
def _add_edge(self, source_id, target_id, source_path, settings):
self.graph.add_edge(
source_id,
target_id,
source_paths=source_path,
label=settings.label,
color=settings.color,
)
def _is_resource(self, cfn, identifier):
"""Check if the identifier is that of a Resource"""
return cfn.template.get("Resources", {}).get(identifier, {})
def _find_parameter(self, string):
"""Search string for tokenized fields"""
regex = re.compile(r"\${([a-zA-Z0-9.]*)}")
if isinstance(string, str):
return regex.findall(string)
return []
# pylint: disable=import-outside-toplevel,unused-variable
def to_dot(self, path):
"""Export the graph to a file with DOT format"""
view = self.settings.subgraph_view(self.graph)
try:
import pygraphviz # pylint: disable=unused-import
networkx.drawing.nx_agraph.write_dot(view, path)
except ImportError:
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=PendingDeprecationWarning)
warnings.simplefilter("ignore", category=DeprecationWarning)
import pydot # pylint: disable=unused-import
networkx.drawing.nx_pydot.write_dot(view, path)
except ImportError as e:
raise e