#!/usr/bin/env python
from nose.tools import *
from nose import SkipTest
import networkx as nx
from networkx.testing.utils import *
import io
import tempfile
import os
class BaseGraphML(object):
def setUp(self):
self.simple_directed_data = """
"""
self.simple_directed_graph = nx.DiGraph()
self.simple_directed_graph.add_node('n10')
self.simple_directed_graph.add_edge('n0', 'n2', id='foo')
self.simple_directed_graph.add_edges_from([('n1', 'n2'),
('n2', 'n3'),
('n3', 'n5'),
('n3', 'n4'),
('n4', 'n6'),
('n6', 'n5'),
('n5', 'n7'),
('n6', 'n8'),
('n8', 'n7'),
('n8', 'n9'),
])
self.simple_directed_fh = \
io.BytesIO(self.simple_directed_data.encode('UTF-8'))
self.attribute_data = """
yellow
green
blue
red
turquoise
1.0
1.0
2.0
1.1
"""
self.attribute_graph = nx.DiGraph(id='G')
self.attribute_graph.graph['node_default'] = {'color': 'yellow'}
self.attribute_graph.add_node('n0', color='green')
self.attribute_graph.add_node('n2', color='blue')
self.attribute_graph.add_node('n3', color='red')
self.attribute_graph.add_node('n4')
self.attribute_graph.add_node('n5', color='turquoise')
self.attribute_graph.add_edge('n0', 'n2', id='e0', weight=1.0)
self.attribute_graph.add_edge('n0', 'n1', id='e1', weight=1.0)
self.attribute_graph.add_edge('n1', 'n3', id='e2', weight=2.0)
self.attribute_graph.add_edge('n3', 'n2', id='e3')
self.attribute_graph.add_edge('n2', 'n4', id='e4')
self.attribute_graph.add_edge('n3', 'n5', id='e5')
self.attribute_graph.add_edge('n5', 'n4', id='e6', weight=1.1)
self.attribute_fh = io.BytesIO(self.attribute_data.encode('UTF-8'))
self.attribute_numeric_type_data = """
1
2.0
1
k
1.0
"""
self.attribute_numeric_type_graph = nx.DiGraph()
self.attribute_numeric_type_graph.add_node('n0', weight=1)
self.attribute_numeric_type_graph.add_node('n1', weight=2.0)
self.attribute_numeric_type_graph.add_edge('n0', 'n1', weight=1)
self.attribute_numeric_type_graph.add_edge('n1', 'n1', weight=1.0)
fh = io.BytesIO(self.attribute_numeric_type_data.encode('UTF-8'))
self.attribute_numeric_type_fh = fh
self.simple_undirected_data = """
"""
#
self.simple_undirected_graph = nx.Graph()
self.simple_undirected_graph.add_node('n10')
self.simple_undirected_graph.add_edge('n0', 'n2', id='foo')
self.simple_undirected_graph.add_edges_from([('n1', 'n2'),
('n2', 'n3'),
])
fh = io.BytesIO(self.simple_undirected_data.encode('UTF-8'))
self.simple_undirected_fh = fh
class TestReadGraphML(BaseGraphML):
def test_read_simple_directed_graphml(self):
G = self.simple_directed_graph
H = nx.read_graphml(self.simple_directed_fh)
assert_equal(sorted(G.nodes()), sorted(H.nodes()))
assert_equal(sorted(G.edges()), sorted(H.edges()))
assert_equal(sorted(G.edges(data=True)),
sorted(H.edges(data=True)))
self.simple_directed_fh.seek(0)
I = nx.parse_graphml(self.simple_directed_data)
assert_equal(sorted(G.nodes()), sorted(I.nodes()))
assert_equal(sorted(G.edges()), sorted(I.edges()))
assert_equal(sorted(G.edges(data=True)),
sorted(I.edges(data=True)))
def test_read_simple_undirected_graphml(self):
G = self.simple_undirected_graph
H = nx.read_graphml(self.simple_undirected_fh)
assert_nodes_equal(G.nodes(), H.nodes())
assert_edges_equal(G.edges(), H.edges())
self.simple_undirected_fh.seek(0)
I = nx.parse_graphml(self.simple_undirected_data)
assert_nodes_equal(G.nodes(), I.nodes())
assert_edges_equal(G.edges(), I.edges())
def test_read_attribute_graphml(self):
G = self.attribute_graph
H = nx.read_graphml(self.attribute_fh)
assert_nodes_equal(G.nodes(True), sorted(H.nodes(data=True)))
ge = sorted(G.edges(data=True))
he = sorted(H.edges(data=True))
for a, b in zip(ge, he):
assert_equal(a, b)
self.attribute_fh.seek(0)
I = nx.parse_graphml(self.attribute_data)
assert_equal(sorted(G.nodes(True)), sorted(I.nodes(data=True)))
ge = sorted(G.edges(data=True))
he = sorted(I.edges(data=True))
for a, b in zip(ge, he):
assert_equal(a, b)
def test_directed_edge_in_undirected(self):
s = """
"""
fh = io.BytesIO(s.encode('UTF-8'))
assert_raises(nx.NetworkXError, nx.read_graphml, fh)
assert_raises(nx.NetworkXError, nx.parse_graphml, s)
def test_undirected_edge_in_directed(self):
s = """
"""
fh = io.BytesIO(s.encode('UTF-8'))
assert_raises(nx.NetworkXError, nx.read_graphml, fh)
assert_raises(nx.NetworkXError, nx.parse_graphml, s)
def test_key_raise(self):
s = """
yellow
green
blue
1.0
"""
fh = io.BytesIO(s.encode('UTF-8'))
assert_raises(nx.NetworkXError, nx.read_graphml, fh)
assert_raises(nx.NetworkXError, nx.parse_graphml, s)
def test_hyperedge_raise(self):
s = """
yellow
green
blue
"""
fh = io.BytesIO(s.encode('UTF-8'))
assert_raises(nx.NetworkXError, nx.read_graphml, fh)
assert_raises(nx.NetworkXError, nx.parse_graphml, s)
def test_multigraph_keys(self):
# Test that reading multigraphs uses edge id attributes as keys
s = """
"""
fh = io.BytesIO(s.encode('UTF-8'))
G = nx.read_graphml(fh)
expected = [("n0", "n1", "e0"), ("n0", "n1", "e1")]
assert_equal(sorted(G.edges(keys=True)), expected)
fh.seek(0)
H = nx.parse_graphml(s)
assert_equal(sorted(H.edges(keys=True)), expected)
def test_preserve_multi_edge_data(self):
"""
Test that data and keys of edges are preserved on consequent
write and reads
"""
G = nx.MultiGraph()
G.add_node(1)
G.add_node(2)
G.add_edges_from([
# edges with no data, no keys:
(1, 2),
# edges with only data:
(1, 2, dict(key='data_key1')),
(1, 2, dict(id='data_id2')),
(1, 2, dict(key='data_key3', id='data_id3')),
# edges with both data and keys:
(1, 2, 103, dict(key='data_key4')),
(1, 2, 104, dict(id='data_id5')),
(1, 2, 105, dict(key='data_key6', id='data_id7')),
])
fh = io.BytesIO()
nx.write_graphml(G, fh)
fh.seek(0)
H = nx.read_graphml(fh, node_type=int)
assert_edges_equal(
G.edges(data=True, keys=True), H.edges(data=True, keys=True)
)
assert_equal(G._adj, H._adj)
def test_yfiles_extension(self):
data = """
1
2
"""
fh = io.BytesIO(data.encode('UTF-8'))
G = nx.read_graphml(fh)
assert_equal(list(G.edges()), [('n0', 'n1')])
assert_equal(G['n0']['n1']['id'], 'e0')
assert_equal(G.nodes['n0']['label'], '1')
assert_equal(G.nodes['n1']['label'], '2')
H = nx.parse_graphml(data)
assert_equal(list(H.edges()), [('n0', 'n1')])
assert_equal(H['n0']['n1']['id'], 'e0')
assert_equal(H.nodes['n0']['label'], '1')
assert_equal(H.nodes['n1']['label'], '2')
def test_bool(self):
s = """
false
true
false
FaLsE
True
0
1
"""
fh = io.BytesIO(s.encode('UTF-8'))
G = nx.read_graphml(fh)
H = nx.parse_graphml(s)
for graph in [G, H]:
assert_equal(graph.nodes['n0']['test'], True)
assert_equal(graph.nodes['n2']['test'], False)
assert_equal(graph.nodes['n3']['test'], False)
assert_equal(graph.nodes['n4']['test'], True)
assert_equal(graph.nodes['n5']['test'], False)
assert_equal(graph.nodes['n6']['test'], True)
def test_graphml_header_line(self):
good = """
false
true
"""
bad = """
false
true
"""
ugly = """
false
true
"""
for s in (good, bad):
fh = io.BytesIO(s.encode('UTF-8'))
G = nx.read_graphml(fh)
H = nx.parse_graphml(s)
for graph in [G, H]:
assert_equal(graph.nodes['n0']['test'], True)
fh = io.BytesIO(ugly.encode('UTF-8'))
assert_raises(nx.NetworkXError, nx.read_graphml, fh)
assert_raises(nx.NetworkXError, nx.parse_graphml, ugly)
def test_read_attributes_with_groups(self):
data = """\
2
Group 3
Folder 3
Group 1
Folder 1
1
3
Group 2
Folder 2
5
6
9
"""
# verify that nodes / attributes are correctly read when part of a group
fh = io.BytesIO(data.encode('UTF-8'))
G = nx.read_graphml(fh)
data = [x for _, x in G.nodes(data=True)]
assert_equal(len(data), 9)
for node_data in data:
assert_not_equal(node_data['CustomProperty'], '')
class TestWriteGraphML(BaseGraphML):
writer = staticmethod(nx.write_graphml_lxml)
@classmethod
def setupClass(cls):
try:
import lxml.etree
except ImportError:
raise SkipTest('lxml.etree not available.')
def test_write_interface(self):
try:
import lxml.etree
assert_equal(nx.write_graphml, nx.write_graphml_lxml)
except ImportError:
assert_equal(nx.write_graphml, nx.write_graphml_xml)
def test_write_read_simple_directed_graphml(self):
G = self.simple_directed_graph
G.graph['hi'] = 'there'
fh = io.BytesIO()
self.writer(G, fh)
fh.seek(0)
H = nx.read_graphml(fh)
assert_equal(sorted(G.nodes()), sorted(H.nodes()))
assert_equal(sorted(G.edges()), sorted(H.edges()))
assert_equal(sorted(G.edges(data=True)), sorted(H.edges(data=True)))
self.simple_directed_fh.seek(0)
def test_write_read_attribute_numeric_type_graphml(self):
from xml.etree.ElementTree import parse
G = self.attribute_numeric_type_graph
fh = io.BytesIO()
self.writer(G, fh, infer_numeric_types=True)
fh.seek(0)
H = nx.read_graphml(fh)
fh.seek(0)
assert_nodes_equal(G.nodes(), H.nodes())
assert_edges_equal(G.edges(), H.edges())
assert_edges_equal(G.edges(data=True), H.edges(data=True))
self.attribute_numeric_type_fh.seek(0)
xml = parse(fh)
# Children are the key elements, and the graph element
children = xml.getroot().getchildren()
assert_equal(len(children), 3)
keys = [child.items() for child in children[:2]]
assert_equal(len(keys), 2)
assert_in(('attr.type', 'double'), keys[0])
assert_in(('attr.type', 'double'), keys[1])
def test_more_multigraph_keys(self):
"""Writing keys as edge id attributes means keys become strings.
The original keys are stored as data, so read them back in
if `make_str(key) == edge_id`
This allows the adjacency to remain the same.
"""
G = nx.MultiGraph()
G.add_edges_from([('a', 'b', 2), ('a', 'b', 3)])
fd, fname = tempfile.mkstemp()
self.writer(G, fname)
H = nx.read_graphml(fname)
assert_true(H.is_multigraph())
assert_edges_equal(G.edges(keys=True), H.edges(keys=True))
assert_equal(G._adj, H._adj)
os.close(fd)
os.unlink(fname)
def test_default_attribute(self):
G = nx.Graph(name="Fred")
G.add_node(1, label=1, color='green')
nx.add_path(G, [0, 1, 2, 3])
G.add_edge(1, 2, weight=3)
G.graph['node_default'] = {'color': 'yellow'}
G.graph['edge_default'] = {'weight': 7}
fh = io.BytesIO()
self.writer(G, fh)
fh.seek(0)
H = nx.read_graphml(fh, node_type=int)
assert_nodes_equal(G.nodes(), H.nodes())
assert_edges_equal(G.edges(), H.edges())
assert_equal(G.graph, H.graph)
def test_multigraph_to_graph(self):
# test converting multigraph to graph if no parallel edges found
G = nx.MultiGraph()
G.add_edges_from([('a', 'b', 2), ('b', 'c', 3)]) # no multiedges
fd, fname = tempfile.mkstemp()
self.writer(G, fname)
H = nx.read_graphml(fname)
assert_false(H.is_multigraph())
os.close(fd)
os.unlink(fname)
def test_numpy_float(self):
try:
import numpy as np
except:
return
wt = np.float(3.4)
G = nx.Graph([(1, 2, {'weight': wt})])
fd, fname = tempfile.mkstemp()
self.writer(G, fname)
H = nx.read_graphml(fname, node_type=int)
assert_equal(G._adj, H._adj)
os.close(fd)
os.unlink(fname)
def test_numpy_float64(self):
try:
import numpy as np
except:
return
wt = np.float64(3.4)
G = nx.Graph([(1, 2, {'weight': wt})])
fd, fname = tempfile.mkstemp()
self.writer(G, fname)
H = nx.read_graphml(fname, node_type=int)
assert_equal(G.edges, H.edges)
wtG = G[1][2]['weight']
wtH = H[1][2]['weight']
assert_almost_equal(wtG, wtH, places=6)
assert_equal(type(wtG), np.float64)
assert_equal(type(wtH), float)
os.close(fd)
os.unlink(fname)
def test_numpy_float32(self):
try:
import numpy as np
except:
return
wt = np.float32(3.4)
G = nx.Graph([(1, 2, {'weight': wt})])
fd, fname = tempfile.mkstemp()
self.writer(G, fname)
H = nx.read_graphml(fname, node_type=int)
assert_equal(G.edges, H.edges)
wtG = G[1][2]['weight']
wtH = H[1][2]['weight']
assert_almost_equal(wtG, wtH, places=6)
assert_equal(type(wtG), np.float32)
assert_equal(type(wtH), float)
os.close(fd)
os.unlink(fname)
def test_unicode_attributes(self):
G = nx.Graph()
try: # Python 3.x
name1 = chr(2344) + chr(123) + chr(6543)
name2 = chr(5543) + chr(1543) + chr(324)
node_type = str
except ValueError: # Python 2.6+
name1 = unichr(2344) + unichr(123) + unichr(6543)
name2 = unichr(5543) + unichr(1543) + unichr(324)
node_type = unicode
G.add_edge(name1, 'Radiohead', foo=name2)
fd, fname = tempfile.mkstemp()
self.writer(G, fname)
H = nx.read_graphml(fname, node_type=node_type)
assert_equal(G._adj, H._adj)
os.close(fd)
os.unlink(fname)
def test_unicode_escape(self):
# test for handling json escaped stings in python 2 Issue #1880
import json
a = dict(a='{"a": "123"}') # an object with many chars to escape
try: # Python 3.x
chr(2344)
sa = json.dumps(a)
except ValueError: # Python 2.6+
sa = unicode(json.dumps(a))
G = nx.Graph()
G.graph['test'] = sa
fh = io.BytesIO()
self.writer(G, fh)
fh.seek(0)
H = nx.read_graphml(fh)
assert_equal(G.graph['test'], H.graph['test'])
class TestXMLGraphML(TestWriteGraphML):
writer = staticmethod(nx.write_graphml_xml)
@classmethod
def setupClass(cls):
try:
import xml.etree.ElementTree
except ImportError:
raise SkipTest('xml.etree.ElementTree not available.')