# Copyright 2018 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ======================================================================== """Tensor Tracer report generation utilities.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections import hashlib import os from tensorflow.python.platform import gfile from tensorflow.python.platform import tf_logging as logging from tensorflow.python.tpu import tensor_tracer_pb2 _TRACER_LOG_PREFIX = ' [>>>TT>>>]' _MARKER_SECTION_BEGIN = '!!!!!!! section-begin:' _MARKER_SECTION_END = '!!!!!!! section-end:' _SECTION_NAME_CONFIG = 'configuration' _SECTION_NAME_REASON = 'reason' _SECTION_NAME_OP_LIST = 'op-list' _SECTION_NAME_TENSOR_LIST = 'tensor-list' _SECTION_NAME_CACHE_INDEX_MAP = 'cache-index-map' _SECTION_NAME_GRAPH = 'graph' _SECTION_NAME_TENSOR_TRACER_CHECKPOINT = 'tensor_tracer_checkpoint' _FIELD_NAME_VERSION = 'version:' _FIELD_NAME_DEVICE = 'device:' _FIELD_NAME_TRACE_MODE = 'trace-mode:' _FIELD_NAME_SUBMODE = 'submode:' _FIELD_NAME_NUM_REPLICAS = 'num-replicas:' _FIELD_NAME_NUM_REPLICAS_PER_HOST = 'num-replicas-per-host:' _FIELD_NAME_NUM_HOSTS = 'num-hosts:' _FIELD_NAME_NUM_OPS = 'number-of-ops:' _FIELD_NAME_NUM_TENSORS = 'number-of-tensors:' _FIELD_NAME_NUM_CACHE_INDICES = 'number-of-indices:' _FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED = 'topological-sort-succeed:' _CURRENT_VERSION = 'use-outside-compilation' _TT_REPORT_PROTO = 'tensor_tracer_report.report_pb' def report_proto_path(trace_dir): """Returns the path where report proto should be written. Args: trace_dir: String denoting the trace directory. Returns: A string denoting the path to the report proto. """ return os.path.join(trace_dir, _TT_REPORT_PROTO) def topological_sort(g): """Performs topological sort on the given graph. Args: g: the graph. Returns: A pair where the first element indicates if the topological sort succeeded (True if there is no cycle found; False if a cycle is found) and the second element is either the sorted list of nodes or the cycle of nodes found. """ def _is_loop_edge(op): """Returns true if the op is the end of a while-loop creating a cycle.""" return op.type in ['NextIteration'] def _in_op_degree(op): """Returns the number of incoming edges to the given op. The edge calculation skips the edges that come from 'NextIteration' ops. NextIteration creates a cycle in the graph. We break cycles by treating this op as 'sink' and ignoring all outgoing edges from it. Args: op: Tf.Operation Returns: the number of incoming edges. """ count = 0 for op in op.control_inputs + [in_tensor.op for in_tensor in op.inputs]: if not _is_loop_edge(op): count += 1 return count sorted_ops = [] op_in_degree = {op: _in_op_degree(op) for op in g.get_operations()} frontier = [op for (op, degree) in op_in_degree.items() if degree == 0] frontier.sort(key=lambda op: op.name) while frontier: op = frontier.pop() # Remove the op from graph, and remove its outgoing edges. sorted_ops.append(op) if _is_loop_edge(op): continue # pylint: disable=protected-access consumers = list(op._control_outputs) # pylint: enable=protected-access for out_tensor in op.outputs: consumers += [consumer_op for consumer_op in out_tensor.consumers()] consumers.sort(key=lambda op: op.name) for consumer in consumers: # For each deleted edge shift the bucket of the vertex. op_in_degree[consumer] -= 1 if op_in_degree[consumer] == 0: frontier.append(consumer) if op_in_degree[consumer] < 0: raise ValueError('consumer:%s degree mismatch'%consumer.name) left_ops = set(op for (op, degree) in op_in_degree.items() if degree > 0) if left_ops: return (True, left_ops) else: assert len(g.get_operations()) == len(sorted_ops) return (False, sorted_ops) class TensorTracerConfig(object): """Tensor Tracer config object.""" def __init__(self): self.version = _CURRENT_VERSION self.device_type = None self.num_replicas = None self.num_replicas_per_host = None self.num_hosts = None class TensorTraceOrder(object): """Class that is responsible from storing the trace-id of the tensors.""" def __init__(self, graph_order, traced_tensors): self.graph_order = graph_order self.traced_tensors = traced_tensors self._create_tensor_maps() def _create_tensor_maps(self): """Creates tensor to cache id maps.""" self.tensorname_to_cache_idx = {} self.cache_idx_to_tensor_idx = [] for out_tensor in self.traced_tensors: tensor_name = out_tensor.name if tensor_name in self.tensorname_to_cache_idx: raise ValueError( 'Tensor name %s should not be already in ' 'tensorname_to_cache_idx'%tensor_name) if tensor_name not in self.graph_order.tensor_to_idx: raise ValueError( 'Tensor name %s is not in the tensor_to_idx'%tensor_name) tensor_idx = self.graph_order.tensor_to_idx[tensor_name] cache_idx = len(self.tensorname_to_cache_idx) self.tensorname_to_cache_idx[tensor_name] = cache_idx self.cache_idx_to_tensor_idx.append(tensor_idx) if len(self.tensorname_to_cache_idx) != len( self.cache_idx_to_tensor_idx): raise RuntimeError('len(self.tensorname_to_cache_idx) != ' 'len(self.cache_idx_to_tensor_idx') def sort_tensors_and_ops(graph): """Returns a wrapper that has consistent tensor and op orders.""" graph_wrapper = collections.namedtuple('GraphWrapper', ['graph', 'operations', 'op_to_idx', 'tensors', 'tensor_to_idx', 'contains_cycle', 'topological_order_or_cycle']) contains_cycle, topological_order_or_cycle = topological_sort(graph) if not contains_cycle: operations = topological_order_or_cycle else: operations = graph.get_operations() op_to_idx = {op.name: index for index, op in enumerate(operations)} tensors = [] for op in operations: tensors.extend(op.outputs) tensor_to_idx = {tensor.name: index for index, tensor in enumerate(tensors)} return graph_wrapper(graph=graph, operations=operations, op_to_idx=op_to_idx, tensors=tensors, tensor_to_idx=tensor_to_idx, contains_cycle=contains_cycle, topological_order_or_cycle=topological_order_or_cycle) class OpenReportFile(object): """Context manager for writing report file.""" def __init__(self, tt_parameters): if not tt_parameters.report_file_path: self._report_file = None return try: self._report_file = gfile.Open(tt_parameters.report_file_path, 'w') except IOError as e: raise e def __enter__(self): return self._report_file def __exit__(self, unused_type, unused_value, unused_traceback): if self._report_file: self._report_file.close() def proto_fingerprint(message_proto): serialized_message = message_proto.SerializeToString() hasher = hashlib.sha256(serialized_message) return hasher.hexdigest() class TTReportHandle(object): """Utility class responsible from creating a tensor tracer report.""" def __init__(self): self.instrument_records = {} self._report_file = None def instrument(self, name, explanation): self.instrument_records[name] = explanation def instrument_op(self, op, explanation): self.instrument(op.name, explanation) def instrument_tensor(self, tensor, explanation): self.instrument(tensor.name, explanation) def create_report_proto(self, tt_config, tt_parameters, tensor_trace_order, tensor_trace_points, collected_signature_types): """Creates and returns a proto that stores tensor tracer configuration. Args: tt_config: TensorTracerConfig object holding information about the run environment (device, # cores, # hosts), and tensor tracer version information. tt_parameters: TTParameters objects storing the user provided parameters for tensor tracer. tensor_trace_order: TensorTraceOrder object storing a topological order of the graph. tensor_trace_points: Progromatically added trace_points/checkpoints. collected_signature_types: The signature types collected, e,g, norm, max, min, mean... Returns: TensorTracerReport proto. """ report = tensor_tracer_pb2.TensorTracerReport() report.config.version = tt_config.version report.config.device = tt_config.device_type report.config.num_cores = tt_config.num_replicas report.config.num_hosts = tt_config.num_hosts report.config.num_cores_per_host = tt_config.num_replicas_per_host report.config.submode = tt_parameters.submode report.config.trace_mode = tt_parameters.trace_mode for signature_name, _ in sorted(collected_signature_types.items(), key=lambda x: x[1]): report.config.signatures.append(signature_name) for tensor in tensor_trace_order.graph_order.tensors: tensor_def = tensor_tracer_pb2.TensorTracerReport.TracedTensorDef() tensor_def.name = tensor.name if tensor.name in tensor_trace_order.tensorname_to_cache_idx: tensor_def.is_traced = True tensor_def.cache_index = ( tensor_trace_order.tensorname_to_cache_idx[tensor.name]) else: # To prevent small changes affecting the fingerprint calculation, avoid # writing the untraced tensors to metadata. Fingerprints will be # different only when the list of the traced tensors are different. if tt_parameters.use_fingerprint_subdir: continue tensor_def.is_traced = False if tensor.name in tensor_trace_points: tensor_def.trace_point_name = tensor_trace_points[tensor.name] if tensor.name in self.instrument_records: tensor_def.explanation = self.instrument_records[tensor.name] elif tensor.op.name in self.instrument_records: tensor_def.explanation = self.instrument_records[tensor.op.name] report.tensordef[tensor.name].CopyFrom(tensor_def) report.fingerprint = proto_fingerprint(report) logging.info('TensorTracerProto fingerprint is %s.', report.fingerprint) tf_graph = tensor_trace_order.graph_order.graph report.graphdef.CopyFrom(tf_graph.as_graph_def()) return report def write_report_proto(self, report_proto, tt_parameters): """Writes the given report proto under trace_dir.""" gfile.MakeDirs(tt_parameters.trace_dir) report_path = report_proto_path(tt_parameters.trace_dir) with gfile.GFile(report_path, 'wb') as f: f.write(report_proto.SerializeToString()) def create_report(self, tt_config, tt_parameters, tensor_trace_order, tensor_trace_points): """Creates a report file and writes the trace information.""" with OpenReportFile(tt_parameters) as self._report_file: self._write_config_section(tt_config, tt_parameters) self._write_op_list_section(tensor_trace_order.graph_order) self._write_tensor_list_section(tensor_trace_order.graph_order) self._write_trace_points(tensor_trace_points) self._write_cache_index_map_section(tensor_trace_order) self._write_reason_section() self._write_graph_section(tensor_trace_order.graph_order) def _write_trace_points(self, tensor_trace_points): """Writes the list of checkpoints.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_TENSOR_TRACER_CHECKPOINT)) for (tensor, checkpoint_name) in tensor_trace_points: self._write_report('%s %s\n'%(tensor.name, checkpoint_name)) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_TENSOR_TRACER_CHECKPOINT)) def _write_report(self, content): """Writes the given content to the report.""" line = '%s %s'%(_TRACER_LOG_PREFIX, content) if self._report_file: self._report_file.write(line) else: logging.info(line) def _write_config_section(self, tt_config, tt_parameters): """Writes the config section of the report.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_CONFIG)) self._write_report('%s %s\n'%(_FIELD_NAME_VERSION, tt_config.version)) self._write_report('%s %s\n'%(_FIELD_NAME_DEVICE, tt_config.device_type)) self._write_report('%s %s\n'%(_FIELD_NAME_TRACE_MODE, tt_parameters.trace_mode)) self._write_report('%s %s\n'%(_FIELD_NAME_SUBMODE, tt_parameters.submode)) self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS, tt_config.num_replicas)) self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS_PER_HOST, tt_config.num_replicas_per_host)) self._write_report('%s %s\n'%(_FIELD_NAME_NUM_HOSTS, tt_config.num_hosts)) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_CONFIG)) def _write_reason_section(self): """Writes the reason section of the report.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_REASON)) for key in sorted(self.instrument_records): self._write_report('"%s" %s\n'%(key, self.instrument_records[key])) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_REASON)) def _write_op_list_section(self, graph_order): """Writes the Op-list section of the report.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_OP_LIST)) self._write_report('%s %d\n'%(_FIELD_NAME_NUM_OPS, len(graph_order.operations))) for i in range(0, len(graph_order.operations)): op = graph_order.operations[i] line = '%d "%s" %s'%(i, op.name, op.type) for out_tensor in op.outputs: if out_tensor.name not in graph_order.tensor_to_idx: raise ValueError( 'out_tensor %s is not in tensor_to_idx'%out_tensor.name) line += ' %d'%graph_order.tensor_to_idx[out_tensor.name] line += '\n' self._write_report(line) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_OP_LIST)) def _write_tensor_list_section(self, graph_order): """Writes the tensor-list section of the report.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_TENSOR_LIST)) self._write_report('%s %d\n'%(_FIELD_NAME_NUM_TENSORS, len(graph_order.tensors))) for i in range(0, len(graph_order.tensors)): tensor = graph_order.tensors[i] line = '%d "%s"'%(i, tensor.name) consumers = tensor.consumers() consumers.sort(key=lambda op: op.name) for consumer_op in consumers: if consumer_op.name not in graph_order.op_to_idx: raise ValueError( 'consumer_op %s is not in op_to_idx'%consumer_op.name) line += ' %d'%graph_order.op_to_idx[consumer_op.name] line += '\n' self._write_report(line) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_TENSOR_LIST)) def _write_cache_index_map_section(self, tensor_trace_order): """Writes the mapping from cache index to tensor index to the report.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_CACHE_INDEX_MAP)) self._write_report('%s %d\n'%( _FIELD_NAME_NUM_CACHE_INDICES, len(tensor_trace_order.cache_idx_to_tensor_idx))) for cache_idx in range(0, len(tensor_trace_order.cache_idx_to_tensor_idx)): tensor_idx = tensor_trace_order.cache_idx_to_tensor_idx[cache_idx] line = '%d %d\n'%(cache_idx, tensor_idx) self._write_report(line) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_CACHE_INDEX_MAP)) def _write_graph_section(self, graph_order): """Writes the graph section of the report.""" self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_GRAPH)) self._write_report('%s %s\n'%(_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED, not graph_order.contains_cycle)) l = list(graph_order.topological_order_or_cycle) for i in range(0, len(l)): self._write_report('%d "%s"\n'%(i, l[i].name)) self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_GRAPH))