#!/usr/bin/env python from nose.tools import * from nose import SkipTest import networkx as nx from networkx.testing.utils import * import io import tempfile import os class BaseGraphML(object): def setUp(self): self.simple_directed_data = """ """ self.simple_directed_graph = nx.DiGraph() self.simple_directed_graph.add_node('n10') self.simple_directed_graph.add_edge('n0', 'n2', id='foo') self.simple_directed_graph.add_edges_from([('n1', 'n2'), ('n2', 'n3'), ('n3', 'n5'), ('n3', 'n4'), ('n4', 'n6'), ('n6', 'n5'), ('n5', 'n7'), ('n6', 'n8'), ('n8', 'n7'), ('n8', 'n9'), ]) self.simple_directed_fh = \ io.BytesIO(self.simple_directed_data.encode('UTF-8')) self.attribute_data = """ yellow green blue red turquoise 1.0 1.0 2.0 1.1 """ self.attribute_graph = nx.DiGraph(id='G') self.attribute_graph.graph['node_default'] = {'color': 'yellow'} self.attribute_graph.add_node('n0', color='green') self.attribute_graph.add_node('n2', color='blue') self.attribute_graph.add_node('n3', color='red') self.attribute_graph.add_node('n4') self.attribute_graph.add_node('n5', color='turquoise') self.attribute_graph.add_edge('n0', 'n2', id='e0', weight=1.0) self.attribute_graph.add_edge('n0', 'n1', id='e1', weight=1.0) self.attribute_graph.add_edge('n1', 'n3', id='e2', weight=2.0) self.attribute_graph.add_edge('n3', 'n2', id='e3') self.attribute_graph.add_edge('n2', 'n4', id='e4') self.attribute_graph.add_edge('n3', 'n5', id='e5') self.attribute_graph.add_edge('n5', 'n4', id='e6', weight=1.1) self.attribute_fh = io.BytesIO(self.attribute_data.encode('UTF-8')) self.attribute_numeric_type_data = """ 1 2.0 1 k 1.0 """ self.attribute_numeric_type_graph = nx.DiGraph() self.attribute_numeric_type_graph.add_node('n0', weight=1) self.attribute_numeric_type_graph.add_node('n1', weight=2.0) self.attribute_numeric_type_graph.add_edge('n0', 'n1', weight=1) self.attribute_numeric_type_graph.add_edge('n1', 'n1', weight=1.0) fh = io.BytesIO(self.attribute_numeric_type_data.encode('UTF-8')) self.attribute_numeric_type_fh = fh self.simple_undirected_data = """ """ # self.simple_undirected_graph = nx.Graph() self.simple_undirected_graph.add_node('n10') self.simple_undirected_graph.add_edge('n0', 'n2', id='foo') self.simple_undirected_graph.add_edges_from([('n1', 'n2'), ('n2', 'n3'), ]) fh = io.BytesIO(self.simple_undirected_data.encode('UTF-8')) self.simple_undirected_fh = fh class TestReadGraphML(BaseGraphML): def test_read_simple_directed_graphml(self): G = self.simple_directed_graph H = nx.read_graphml(self.simple_directed_fh) assert_equal(sorted(G.nodes()), sorted(H.nodes())) assert_equal(sorted(G.edges()), sorted(H.edges())) assert_equal(sorted(G.edges(data=True)), sorted(H.edges(data=True))) self.simple_directed_fh.seek(0) I = nx.parse_graphml(self.simple_directed_data) assert_equal(sorted(G.nodes()), sorted(I.nodes())) assert_equal(sorted(G.edges()), sorted(I.edges())) assert_equal(sorted(G.edges(data=True)), sorted(I.edges(data=True))) def test_read_simple_undirected_graphml(self): G = self.simple_undirected_graph H = nx.read_graphml(self.simple_undirected_fh) assert_nodes_equal(G.nodes(), H.nodes()) assert_edges_equal(G.edges(), H.edges()) self.simple_undirected_fh.seek(0) I = nx.parse_graphml(self.simple_undirected_data) assert_nodes_equal(G.nodes(), I.nodes()) assert_edges_equal(G.edges(), I.edges()) def test_read_attribute_graphml(self): G = self.attribute_graph H = nx.read_graphml(self.attribute_fh) assert_nodes_equal(G.nodes(True), sorted(H.nodes(data=True))) ge = sorted(G.edges(data=True)) he = sorted(H.edges(data=True)) for a, b in zip(ge, he): assert_equal(a, b) self.attribute_fh.seek(0) I = nx.parse_graphml(self.attribute_data) assert_equal(sorted(G.nodes(True)), sorted(I.nodes(data=True))) ge = sorted(G.edges(data=True)) he = sorted(I.edges(data=True)) for a, b in zip(ge, he): assert_equal(a, b) def test_directed_edge_in_undirected(self): s = """ """ fh = io.BytesIO(s.encode('UTF-8')) assert_raises(nx.NetworkXError, nx.read_graphml, fh) assert_raises(nx.NetworkXError, nx.parse_graphml, s) def test_undirected_edge_in_directed(self): s = """ """ fh = io.BytesIO(s.encode('UTF-8')) assert_raises(nx.NetworkXError, nx.read_graphml, fh) assert_raises(nx.NetworkXError, nx.parse_graphml, s) def test_key_raise(self): s = """ yellow green blue 1.0 """ fh = io.BytesIO(s.encode('UTF-8')) assert_raises(nx.NetworkXError, nx.read_graphml, fh) assert_raises(nx.NetworkXError, nx.parse_graphml, s) def test_hyperedge_raise(self): s = """ yellow green blue """ fh = io.BytesIO(s.encode('UTF-8')) assert_raises(nx.NetworkXError, nx.read_graphml, fh) assert_raises(nx.NetworkXError, nx.parse_graphml, s) def test_multigraph_keys(self): # Test that reading multigraphs uses edge id attributes as keys s = """ """ fh = io.BytesIO(s.encode('UTF-8')) G = nx.read_graphml(fh) expected = [("n0", "n1", "e0"), ("n0", "n1", "e1")] assert_equal(sorted(G.edges(keys=True)), expected) fh.seek(0) H = nx.parse_graphml(s) assert_equal(sorted(H.edges(keys=True)), expected) def test_preserve_multi_edge_data(self): """ Test that data and keys of edges are preserved on consequent write and reads """ G = nx.MultiGraph() G.add_node(1) G.add_node(2) G.add_edges_from([ # edges with no data, no keys: (1, 2), # edges with only data: (1, 2, dict(key='data_key1')), (1, 2, dict(id='data_id2')), (1, 2, dict(key='data_key3', id='data_id3')), # edges with both data and keys: (1, 2, 103, dict(key='data_key4')), (1, 2, 104, dict(id='data_id5')), (1, 2, 105, dict(key='data_key6', id='data_id7')), ]) fh = io.BytesIO() nx.write_graphml(G, fh) fh.seek(0) H = nx.read_graphml(fh, node_type=int) assert_edges_equal( G.edges(data=True, keys=True), H.edges(data=True, keys=True) ) assert_equal(G._adj, H._adj) def test_yfiles_extension(self): data = """ 1 2 """ fh = io.BytesIO(data.encode('UTF-8')) G = nx.read_graphml(fh) assert_equal(list(G.edges()), [('n0', 'n1')]) assert_equal(G['n0']['n1']['id'], 'e0') assert_equal(G.nodes['n0']['label'], '1') assert_equal(G.nodes['n1']['label'], '2') H = nx.parse_graphml(data) assert_equal(list(H.edges()), [('n0', 'n1')]) assert_equal(H['n0']['n1']['id'], 'e0') assert_equal(H.nodes['n0']['label'], '1') assert_equal(H.nodes['n1']['label'], '2') def test_bool(self): s = """ false true false FaLsE True 0 1 """ fh = io.BytesIO(s.encode('UTF-8')) G = nx.read_graphml(fh) H = nx.parse_graphml(s) for graph in [G, H]: assert_equal(graph.nodes['n0']['test'], True) assert_equal(graph.nodes['n2']['test'], False) assert_equal(graph.nodes['n3']['test'], False) assert_equal(graph.nodes['n4']['test'], True) assert_equal(graph.nodes['n5']['test'], False) assert_equal(graph.nodes['n6']['test'], True) def test_graphml_header_line(self): good = """ false true """ bad = """ false true """ ugly = """ false true """ for s in (good, bad): fh = io.BytesIO(s.encode('UTF-8')) G = nx.read_graphml(fh) H = nx.parse_graphml(s) for graph in [G, H]: assert_equal(graph.nodes['n0']['test'], True) fh = io.BytesIO(ugly.encode('UTF-8')) assert_raises(nx.NetworkXError, nx.read_graphml, fh) assert_raises(nx.NetworkXError, nx.parse_graphml, ugly) def test_read_attributes_with_groups(self): data = """\ 2 Group 3 Folder 3 Group 1 Folder 1 1 3 Group 2 Folder 2 5 6 9 """ # verify that nodes / attributes are correctly read when part of a group fh = io.BytesIO(data.encode('UTF-8')) G = nx.read_graphml(fh) data = [x for _, x in G.nodes(data=True)] assert_equal(len(data), 9) for node_data in data: assert_not_equal(node_data['CustomProperty'], '') class TestWriteGraphML(BaseGraphML): writer = staticmethod(nx.write_graphml_lxml) @classmethod def setupClass(cls): try: import lxml.etree except ImportError: raise SkipTest('lxml.etree not available.') def test_write_interface(self): try: import lxml.etree assert_equal(nx.write_graphml, nx.write_graphml_lxml) except ImportError: assert_equal(nx.write_graphml, nx.write_graphml_xml) def test_write_read_simple_directed_graphml(self): G = self.simple_directed_graph G.graph['hi'] = 'there' fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert_equal(sorted(G.nodes()), sorted(H.nodes())) assert_equal(sorted(G.edges()), sorted(H.edges())) assert_equal(sorted(G.edges(data=True)), sorted(H.edges(data=True))) self.simple_directed_fh.seek(0) def test_write_read_attribute_numeric_type_graphml(self): from xml.etree.ElementTree import parse G = self.attribute_numeric_type_graph fh = io.BytesIO() self.writer(G, fh, infer_numeric_types=True) fh.seek(0) H = nx.read_graphml(fh) fh.seek(0) assert_nodes_equal(G.nodes(), H.nodes()) assert_edges_equal(G.edges(), H.edges()) assert_edges_equal(G.edges(data=True), H.edges(data=True)) self.attribute_numeric_type_fh.seek(0) xml = parse(fh) # Children are the key elements, and the graph element children = xml.getroot().getchildren() assert_equal(len(children), 3) keys = [child.items() for child in children[:2]] assert_equal(len(keys), 2) assert_in(('attr.type', 'double'), keys[0]) assert_in(('attr.type', 'double'), keys[1]) def test_more_multigraph_keys(self): """Writing keys as edge id attributes means keys become strings. The original keys are stored as data, so read them back in if `make_str(key) == edge_id` This allows the adjacency to remain the same. """ G = nx.MultiGraph() G.add_edges_from([('a', 'b', 2), ('a', 'b', 3)]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname) assert_true(H.is_multigraph()) assert_edges_equal(G.edges(keys=True), H.edges(keys=True)) assert_equal(G._adj, H._adj) os.close(fd) os.unlink(fname) def test_default_attribute(self): G = nx.Graph(name="Fred") G.add_node(1, label=1, color='green') nx.add_path(G, [0, 1, 2, 3]) G.add_edge(1, 2, weight=3) G.graph['node_default'] = {'color': 'yellow'} G.graph['edge_default'] = {'weight': 7} fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh, node_type=int) assert_nodes_equal(G.nodes(), H.nodes()) assert_edges_equal(G.edges(), H.edges()) assert_equal(G.graph, H.graph) def test_multigraph_to_graph(self): # test converting multigraph to graph if no parallel edges found G = nx.MultiGraph() G.add_edges_from([('a', 'b', 2), ('b', 'c', 3)]) # no multiedges fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname) assert_false(H.is_multigraph()) os.close(fd) os.unlink(fname) def test_numpy_float(self): try: import numpy as np except: return wt = np.float(3.4) G = nx.Graph([(1, 2, {'weight': wt})]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=int) assert_equal(G._adj, H._adj) os.close(fd) os.unlink(fname) def test_numpy_float64(self): try: import numpy as np except: return wt = np.float64(3.4) G = nx.Graph([(1, 2, {'weight': wt})]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=int) assert_equal(G.edges, H.edges) wtG = G[1][2]['weight'] wtH = H[1][2]['weight'] assert_almost_equal(wtG, wtH, places=6) assert_equal(type(wtG), np.float64) assert_equal(type(wtH), float) os.close(fd) os.unlink(fname) def test_numpy_float32(self): try: import numpy as np except: return wt = np.float32(3.4) G = nx.Graph([(1, 2, {'weight': wt})]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=int) assert_equal(G.edges, H.edges) wtG = G[1][2]['weight'] wtH = H[1][2]['weight'] assert_almost_equal(wtG, wtH, places=6) assert_equal(type(wtG), np.float32) assert_equal(type(wtH), float) os.close(fd) os.unlink(fname) def test_unicode_attributes(self): G = nx.Graph() try: # Python 3.x name1 = chr(2344) + chr(123) + chr(6543) name2 = chr(5543) + chr(1543) + chr(324) node_type = str except ValueError: # Python 2.6+ name1 = unichr(2344) + unichr(123) + unichr(6543) name2 = unichr(5543) + unichr(1543) + unichr(324) node_type = unicode G.add_edge(name1, 'Radiohead', foo=name2) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=node_type) assert_equal(G._adj, H._adj) os.close(fd) os.unlink(fname) def test_unicode_escape(self): # test for handling json escaped stings in python 2 Issue #1880 import json a = dict(a='{"a": "123"}') # an object with many chars to escape try: # Python 3.x chr(2344) sa = json.dumps(a) except ValueError: # Python 2.6+ sa = unicode(json.dumps(a)) G = nx.Graph() G.graph['test'] = sa fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert_equal(G.graph['test'], H.graph['test']) class TestXMLGraphML(TestWriteGraphML): writer = staticmethod(nx.write_graphml_xml) @classmethod def setupClass(cls): try: import xml.etree.ElementTree except ImportError: raise SkipTest('xml.etree.ElementTree not available.')