#!/usr/bin/env python3
"""A script to analyze the dependency relations in a CloudFormation template.
Use like so:
cdk -a integ.something.js synth | ./template-deps-to-dot | dot -Tpng > deps.png
"""
import collections
import fileinput
import sys
try:
import yaml
except ImportError:
print("Please run 'pip3 install pyyaml'")
sys.exit(1)
def main():
args = sys.argv[1:]
if args:
files = [open(filename) for filename in args]
else:
files = [sys.stdin]
templates = [yaml.safe_load(f) for f in files]
graph = DepGraph()
for template in templates:
if not template:
sys.stderr.write('Input does not look like a CloudFormation template.\n')
continue
parse_template(template, graph)
graph.render_dot(sys.stdout)
def parse_template(template, graph):
"""Parse template and add all encountered dependencies to the graph."""
for logical_id, resource_spec in template.get('Resources', {}).items():
resource_type = resource_spec.get('Type', 'AWS::???::???')
path = resource_spec.get('Metadata', {}).get('aws:cdk:path', None)
if path:
path = resource_name_from_path(path)
source = '%s\n(%s)' % (path or logical_id, resource_type)
graph.annotate(logical_id, source)
for dep in find_property_references(resource_spec.get('Properties', {})):
if not dep.target.startswith('AWS::'):
graph.add(logical_id, dep)
for depends_on in resource_spec.get('DependsOn', []):
graph.add(logical_id, Dep(depends_on, 'DependsOn'))
def resource_name_from_path(path):
return '/'.join([p for p in path.split('/') if p != 'Resource'][1:])
def find_property_references(properties):
"""Find references in a resource's Properties.
Returns:
list of Dep objects
"""
ret = []
def recurse(prop_name, obj):
if isinstance(obj, list):
for x in obj:
recurse(prop_name, x)
if isinstance(obj, dict):
ref = parse_reference(obj)
if ref:
ret.append(Dep(ref[0], prop_name))
return
for key, value in obj.items():
recurse(prop_name, value)
for prop_name, prop in properties.items():
recurse(prop_name, prop)
return ret
class DepGraph:
def __init__(self):
# { source -> [ { dependency, label } ]
self.graph = collections.defaultdict(set)
self.annotations = {}
self.has_incoming = set([])
def annotate(self, node, annotation):
self.annotations[node] = annotation
def add(self, source, dep):
self.graph[source].add(dep)
self.has_incoming.add(dep.target)
def render_dot(self, f):
"""Render a dot version of this graph to the given stream."""
f.write('digraph G {\n')
f.write(' rankdir=LR;\n')
f.write(' node [shape=box];\n')
for node, annotation in self.annotations.items():
if node in self.graph or node in self.has_incoming:
f.write(' %s [label=%s];\n'% (dot_escape(node), fancy_label(annotation)))
for source, deps in self.graph.items():
for dep in deps:
f.write(' %s -> %s [label=%s];\n' % (dot_escape(source), dot_escape(dep.target), dot_escape(dep.label)))
f.write('}\n')
def fancy_label(s):
lines = s.split('\n')
return ('<' + lines[0] + ''
+ ''.join('
' + line + '' for line in lines[1:])
+ ' >')
def dot_escape(s):
return '"' + s.replace('\n', '\\n') + '"'
def parse_reference(obj):
"""If this object is an intrinsic reference, return info about it.
Returns: (logicalId, reference) if reference, None otherwise
"""
keys = list(obj.keys())
if keys == ['Ref']:
return (obj[keys[0]], 'Ref')
if keys == ['Fn::GetAtt']:
return (obj[keys[0]][0], obj[keys[0]][1])
return None
class Dep:
def __init__(self, target, label):
self.target = target
self.label = label
def __eq__(self, rhs):
return isinstance(rhs, Dep) and self.target == rhs.target and self.label == rhs.label
def __ne__(self, rhs):
return not (self == rhs)
def __hash__(self):
return hash((self.target, self.label))
if __name__ == '__main__':
main()