# Copyright 2016 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import base64 import codecs import gzip import json import re import StringIO GZIP_HEADER_BYTES = b'\x1f\x8b' # Regular expressions for matching the beginning and end of trace data in HTML # traces. See tracing/extras/importer/trace2html_importer.html. TRACE_DATA_START_LINE_RE = re.compile( r'^<\s*script id="viewer-data" type="(application\/json|text\/plain)">$') TRACE_DATA_END_LINE_RE = re.compile(r'^<\/\s*script>$') def CopyTraceDataFromHTMLFilePath(html_path, trace_path, gzipped_output=False): """Copies trace data from an existing HTML file into new trace file(s). If |html_path| doesn't contain any trace data blocks, this function throws an exception. If |html_path| contains more than one trace data block, the first block will be extracted into |trace_path| and the rest will be extracted into separate files |trace_path|.1, |trace_path|.2, etc. The contents of each trace data block is decoded and, if |gzipped_output| is false, inflated before it's stored in a trace file. This function returns a list of paths of the saved trace files ([|trace_path|, |trace_path|.1, |trace_path|.2, ...]). """ trace_data_list = _ExtractTraceDataFromHTMLFile(html_path, unzip_data=not gzipped_output) saved_paths = [] for i, trace_data in enumerate(trace_data_list): saved_path = trace_path if i == 0 else '%s.%d' % (trace_path, i) saved_paths.append(saved_path) with open(saved_path, 'wb' if gzipped_output else 'w') as trace_file: trace_file.write(trace_data.read()) return saved_paths def ReadTracesFromHTMLFilePath(html_path): """Returns a list of inflated JSON traces extracted from an HTML file.""" return map(json.load, _ExtractTraceDataFromHTMLFile(html_path)) def _ExtractTraceDataFromHTMLFile(html_path, unzip_data=True): with codecs.open(html_path, mode='r', encoding='utf-8') as html_file: lines = html_file.readlines() start_indices = [i for i in xrange(len(lines)) if TRACE_DATA_START_LINE_RE.match(lines[i])] if not start_indices: raise Exception('File %r does not contain trace data') decoded_data_list = [] for start_index in start_indices: end_index = next(i for i in xrange(start_index + 1, len(lines)) if TRACE_DATA_END_LINE_RE.match(lines[i])) encoded_data = '\n'.join(lines[start_index + 1:end_index]).strip() decoded_data_list.append(StringIO.StringIO(base64.b64decode(encoded_data))) if unzip_data: return map(_UnzipFileIfNecessary, decoded_data_list) else: return map(_ZipFileIfNecessary, decoded_data_list) def _UnzipFileIfNecessary(original_file): if _IsFileZipped(original_file): return gzip.GzipFile(fileobj=original_file) else: return original_file def _ZipFileIfNecessary(original_file): if _IsFileZipped(original_file): return original_file else: zipped_file = StringIO.StringIO() with gzip.GzipFile(fileobj=zipped_file, mode='wb') as gzip_wrapper: gzip_wrapper.write(original_file.read()) zipped_file.seek(0) return zipped_file def _IsFileZipped(f): is_gzipped = f.read(len(GZIP_HEADER_BYTES)) == GZIP_HEADER_BYTES f.seek(0) return is_gzipped