# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import Queue
import re
import subprocess
import sys
import threading
import time
import zlib
import systrace_agent
import util
# Text that ADB sends, but does not need to be displayed to the user.
ADB_IGNORE_REGEXP = r'^capturing trace\.\.\. done|^capturing trace\.\.\.'
# The number of seconds to wait on output from ADB.
ADB_STDOUT_READ_TIMEOUT = 0.2
# The adb shell command to initiate a trace.
ATRACE_BASE_ARGS = ['atrace']
# If a custom list of categories is not specified, traces will include
# these categories (if available on the device).
DEFAULT_CATEGORIES = 'sched gfx view dalvik webview input disk am wm'.split()
# The command to list trace categories.
LIST_CATEGORIES_ARGS = ATRACE_BASE_ARGS + ['--list_categories']
# Minimum number of seconds between displaying status updates.
MIN_TIME_BETWEEN_STATUS_UPDATES = 0.2
# ADB sends this text to indicate the beginning of the trace data.
TRACE_START_REGEXP = r'TRACE\:'
# Plain-text trace data should always start with this string.
TRACE_TEXT_HEADER = '# tracer'
# This list is based on the tags in frameworks/native/include/utils/Trace.h for
# legacy platform.
LEGACY_TRACE_TAG_BITS = (
('gfx', 1<<1),
('input', 1<<2),
('view', 1<<3),
('webview', 1<<4),
('wm', 1<<5),
('am', 1<<6),
('sm', 1<<7),
('audio', 1<<8),
('video', 1<<9),
('camera', 1<<10),
)
def try_create_agent(options, categories):
if options.from_file is not None:
return AtraceAgent(options, categories)
device_sdk_version = util.get_device_sdk_version()
if device_sdk_version >= 18:
return AtraceAgent(options, categories)
elif device_sdk_version >= 16:
return AtraceLegacyAgent(options, categories)
class AtraceAgent(systrace_agent.SystraceAgent):
def __init__(self, options, categories):
super(AtraceAgent, self).__init__(options, categories)
self._expect_trace = False
self._adb = None
self._trace_data = None
def start(self):
tracer_args = self._construct_trace_command()
try:
self._adb = subprocess.Popen(tracer_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError as error:
print >> sys.stderr, (
'The command "%s" failed with the following error:' %
' '.join(tracer_args))
print >> sys.stderr, ' ', error
sys.exit(1)
def collect_result(self):
trace_data = self._collect_trace_data();
if self._expect_trace:
self._trace_data = self._preprocess_trace_data(trace_data);
def expect_trace(self):
return self._expect_trace
def get_trace_data(self):
return self._trace_data
def get_class_name(self):
return 'trace-data'
def _construct_list_categories_command(self):
return util.construct_adb_shell_command(
LIST_CATEGORIES_ARGS, self._options.device_serial)
def _construct_extra_trace_command(self):
extra_args = []
if self._options.app_name is not None:
extra_args.extend(['-a', self._options.app_name])
if self._options.kfuncs is not None:
extra_args.extend(['-k', self._options.kfuncs])
if not self._categories:
self._categories = get_default_categories(self._options.device_serial)
extra_args.extend(self._categories)
return extra_args
def _construct_trace_command(self):
"""Builds a command-line used to invoke a trace process.
Returns:
A tuple where the first element is an array of command-line arguments, and
the second element is a boolean which will be true if the commend will
stream trace data.
"""
if self._options.list_categories:
tracer_args = self._construct_list_categories_command()
self._expect_trace = False
elif self._options.from_file is not None:
tracer_args = ['cat', self._options.from_file]
self._expect_trace = True
else:
atrace_args = ATRACE_BASE_ARGS[:]
self._expect_trace = True
if self._options.compress_trace_data:
atrace_args.extend(['-z'])
if ((self._options.trace_time is not None)
and (self._options.trace_time > 0)):
atrace_args.extend(['-t', str(self._options.trace_time)])
if ((self._options.trace_buf_size is not None)
and (self._options.trace_buf_size > 0)):
atrace_args.extend(['-b', str(self._options.trace_buf_size)])
extra_args = self._construct_extra_trace_command()
atrace_args.extend(extra_args)
if self._options.fix_threads:
atrace_args.extend([';', 'ps', '-t'])
tracer_args = util.construct_adb_shell_command(
atrace_args, self._options.device_serial)
return tracer_args
def _collect_trace_data(self):
# Read the output from ADB in a worker thread. This allows us to monitor
# the progress of ADB and bail if ADB becomes unresponsive for any reason.
# Limit the stdout_queue to 128 entries because we will initially be reading
# one byte at a time. When the queue fills up, the reader thread will
# block until there is room in the queue. Once we start downloading the
# trace data, we will switch to reading data in larger chunks, and 128
# entries should be plenty for that purpose.
stdout_queue = Queue.Queue(maxsize=128)
stderr_queue = Queue.Queue()
if self._expect_trace:
# Use stdout.write() (here and for the rest of this function) instead
# of print() to avoid extra newlines.
sys.stdout.write('Capturing trace...')
# Use a chunk_size of 1 for stdout so we can display the output to
# the user without waiting for a full line to be sent.
stdout_thread = FileReaderThread(self._adb.stdout, stdout_queue,
text_file=False, chunk_size=1)
stderr_thread = FileReaderThread(self._adb.stderr, stderr_queue,
text_file=True)
stdout_thread.start()
stderr_thread.start()
# Holds the trace data returned by ADB.
trace_data = []
# Keep track of the current line so we can find the TRACE_START_REGEXP.
current_line = ''
# Set to True once we've received the TRACE_START_REGEXP.
reading_trace_data = False
last_status_update_time = time.time()
while (stdout_thread.isAlive() or stderr_thread.isAlive() or
not stdout_queue.empty() or not stderr_queue.empty()):
if self._expect_trace:
last_status_update_time = status_update(last_status_update_time)
while not stderr_queue.empty():
# Pass along errors from adb.
line = stderr_queue.get()
sys.stderr.write(line)
# Read stdout from adb. The loop exits if we don't get any data for
# ADB_STDOUT_READ_TIMEOUT seconds.
while True:
try:
chunk = stdout_queue.get(True, ADB_STDOUT_READ_TIMEOUT)
except Queue.Empty:
# Didn't get any data, so exit the loop to check that ADB is still
# alive and print anything sent to stderr.
break
if reading_trace_data:
# Save, but don't print, the trace data.
trace_data.append(chunk)
else:
if not self._expect_trace:
sys.stdout.write(chunk)
else:
# Buffer the output from ADB so we can remove some strings that
# don't need to be shown to the user.
current_line += chunk
if re.match(TRACE_START_REGEXP, current_line):
# We are done capturing the trace.
sys.stdout.write('Done.\n')
# Now we start downloading the trace data.
sys.stdout.write('Downloading trace...')
current_line = ''
# Use a larger chunk size for efficiency since we no longer
# need to worry about parsing the stream.
stdout_thread.set_chunk_size(4096)
reading_trace_data = True
elif chunk == '\n' or chunk == '\r':
# Remove ADB output that we don't care about.
current_line = re.sub(ADB_IGNORE_REGEXP, '', current_line)
if len(current_line) > 1:
# ADB printed something that we didn't understand, so show it
# it to the user (might be helpful for debugging).
sys.stdout.write(current_line)
# Reset our current line.
current_line = ''
if self._expect_trace:
if reading_trace_data:
# Indicate to the user that the data download is complete.
sys.stdout.write('Done.\n')
else:
# We didn't receive the trace start tag, so something went wrong.
sys.stdout.write('ERROR.\n')
# Show any buffered ADB output to the user.
current_line = re.sub(ADB_IGNORE_REGEXP, '', current_line)
if current_line:
sys.stdout.write(current_line)
sys.stdout.write('\n')
# The threads should already have stopped, so this is just for cleanup.
stdout_thread.join()
stderr_thread.join()
self._adb.stdout.close()
self._adb.stderr.close()
# The adb process should be done since it's io pipes are closed. Call
# poll() to set the returncode.
self._adb.poll()
if self._adb.returncode != 0:
print >> sys.stderr, ('The command "%s" returned error code %d.' %
(' '.join(tracer_args), self._adb.returncode))
sys.exit(1)
return trace_data
def _preprocess_trace_data(self, trace_data):
"""Performs various processing on atrace data.
Args:
trace_data: The raw trace data.
Returns:
The processed trace data.
"""
trace_data = ''.join(trace_data)
if self._options.fix_threads:
# Extract the thread list dumped by ps.
trace_data, thread_names = extract_thread_list(trace_data)
if trace_data:
trace_data = strip_and_decompress_trace(trace_data)
if not trace_data:
print >> sys.stderr, ('No data was captured. Output file was not '
'written.')
sys.exit(1)
if self._options.fix_threads:
trace_data = fix_thread_names(trace_data, thread_names)
if self._options.fix_circular:
trace_data = fix_circular_traces(trace_data)
return trace_data
class AtraceLegacyAgent(AtraceAgent):
def _construct_list_categories_command(self):
LEGACY_CATEGORIES = """ sched - CPU Scheduling
freq - CPU Frequency
idle - CPU Idle
load - CPU Load
disk - Disk I/O (requires root)
bus - Bus utilization (requires root)
workqueue - Kernel workqueues (requires root)"""
return ["echo", LEGACY_CATEGORIES]
def start(self):
super(AtraceLegacyAgent, self).start()
if self.expect_trace():
SHELL_ARGS = ['getprop', 'debug.atrace.tags.enableflags']
output, return_code = util.run_adb_shell(SHELL_ARGS, self._options.device_serial)
flags = 0
if return_code == 0:
try:
if output.startswith('0x'):
flags = int(output, 16)
elif output.startswith('0'):
flags = int(output, 8)
else:
flags = int(output)
except ValueError, e:
pass
if flags:
tags = []
for desc, bit in LEGACY_TRACE_TAG_BITS:
if bit & flags:
tags.append(desc)
categories = tags + self._categories
print 'Collecting data with following categories:', ' '.join(categories)
def _construct_extra_trace_command(self):
extra_args = []
if not self._categories:
self._categories = ['sched', ]
if 'sched' in self._categories:
extra_args.append('-s')
if 'freq' in self._categories:
extra_args.append('-f')
if 'idle' in self._categories:
extra_args.append('-i')
if 'load' in self._categories:
extra_args.append('-l')
if 'disk' in self._categories:
extra_args.append('-d')
if 'bus' in self._categories:
extra_args.append('-u')
if 'workqueue' in self._categories:
extra_args.append('-w')
return extra_args
class FileReaderThread(threading.Thread):
"""Reads data from a file/pipe on a worker thread.
Use the standard threading. Thread object API to start and interact with the
thread (start(), join(), etc.).
"""
def __init__(self, file_object, output_queue, text_file, chunk_size=-1):
"""Initializes a FileReaderThread.
Args:
file_object: The file or pipe to read from.
output_queue: A Queue.Queue object that will receive the data
text_file: If True, the file will be read one line at a time, and
chunk_size will be ignored. If False, line breaks are ignored and
chunk_size must be set to a positive integer.
chunk_size: When processing a non-text file (text_file = False),
chunk_size is the amount of data to copy into the queue with each
read operation. For text files, this parameter is ignored.
"""
threading.Thread.__init__(self)
self._file_object = file_object
self._output_queue = output_queue
self._text_file = text_file
self._chunk_size = chunk_size
assert text_file or chunk_size > 0
def run(self):
"""Overrides Thread's run() function.
Returns when an EOF is encountered.
"""
if self._text_file:
# Read a text file one line at a time.
for line in self._file_object:
self._output_queue.put(line)
else:
# Read binary or text data until we get to EOF.
while True:
chunk = self._file_object.read(self._chunk_size)
if not chunk:
break
self._output_queue.put(chunk)
def set_chunk_size(self, chunk_size):
"""Change the read chunk size.
This function can only be called if the FileReaderThread object was
created with an initial chunk_size > 0.
Args:
chunk_size: the new chunk size for this file. Must be > 0.
"""
# The chunk size can be changed asynchronously while a file is being read
# in a worker thread. However, type of file can not be changed after the
# the FileReaderThread has been created. These asserts verify that we are
# only changing the chunk size, and not the type of file.
assert not self._text_file
assert chunk_size > 0
self._chunk_size = chunk_size
def get_default_categories(device_serial):
categories_output, return_code = util.run_adb_shell(LIST_CATEGORIES_ARGS,
device_serial)
if return_code == 0 and categories_output:
categories = [c.split('-')[0].strip()
for c in categories_output.splitlines()]
return [c for c in categories if c in DEFAULT_CATEGORIES]
return []
def status_update(last_update_time):
current_time = time.time()
if (current_time - last_update_time) >= MIN_TIME_BETWEEN_STATUS_UPDATES:
# Gathering a trace may take a while. Keep printing something so users
# don't think the script has hung.
sys.stdout.write('.')
sys.stdout.flush()
return current_time
return last_update_time
def extract_thread_list(trace_data):
"""Removes the thread list from the given trace data.
Args:
trace_data: The raw trace data (before decompression).
Returns:
A tuple containing the trace data and a map of thread ids to thread names.
"""
threads = {}
parts = re.split('USER +PID +PPID +VSIZE +RSS +WCHAN +PC +NAME',
trace_data, 1)
if len(parts) == 2:
trace_data = parts[0]
for line in parts[1].splitlines():
cols = line.split(None, 8)
if len(cols) == 9:
tid = int(cols[1])
name = cols[8]
threads[tid] = name
return (trace_data, threads)
def strip_and_decompress_trace(trace_data):
"""Fixes new-lines and decompresses trace data.
Args:
trace_data: The trace data returned by atrace.
Returns:
The decompressed trace data.
"""
# Collapse CRLFs that are added by adb shell.
if trace_data.startswith('\r\n'):
trace_data = trace_data.replace('\r\n', '\n')
elif trace_data.startswith('\r\r\n'):
# On windows, adb adds an extra '\r' character for each line.
trace_data = trace_data.replace('\r\r\n', '\n')
# Skip the initial newline.
trace_data = trace_data[1:]
if not trace_data.startswith(TRACE_TEXT_HEADER):
# No header found, so assume the data is compressed.
trace_data = zlib.decompress(trace_data)
# Enforce Unix line-endings.
trace_data = trace_data.replace('\r', '')
# Skip any initial newlines.
while trace_data and trace_data[0] == '\n':
trace_data = trace_data[1:]
return trace_data
def fix_thread_names(trace_data, thread_names):
"""Replaces thread ids with their names.
Args:
trace_data: The atrace data.
thread_names: A mapping of thread ids to thread names.
Returns:
The updated trace data.
"""
def repl(m):
tid = int(m.group(2))
if tid > 0:
name = thread_names.get(tid)
if name is None:
name = m.group(1)
if name == '<...>':
name = '<' + str(tid) + '>'
thread_names[tid] = name
return name + '-' + m.group(2)
else:
return m.group(0)
trace_data = re.sub(r'^\s*(\S+)-(\d+)', repl, trace_data,
flags=re.MULTILINE)
return trace_data
def fix_circular_traces(out):
"""Fix inconsistentcies in traces due to circular buffering.
The circular buffers are kept per CPU, so it is not guaranteed that the
beginning of a slice is overwritten before the end. To work around this, we
throw away the prefix of the trace where not all CPUs have events yet.
Args:
out: The data to fix.
Returns:
The updated trace data.
"""
# If any of the CPU's buffers have filled up and
# older events have been dropped, the kernel
# emits markers of the form '##### CPU 2 buffer started ####' on
# the line before the first event in the trace on that CPU.
#
# No such headers are emitted if there were no overflows or the trace
# was captured with non-circular buffers.
buffer_start_re = re.compile(r'^#+ CPU \d+ buffer started', re.MULTILINE)
start_of_full_trace = 0
while True:
result = buffer_start_re.search(out, start_of_full_trace + 1)
if result:
start_of_full_trace = result.start()
else:
break
if start_of_full_trace > 0:
# Need to keep the header intact to make the importer happy.
end_of_header = re.search(r'^[^#]', out, re.MULTILINE).start()
out = out[:end_of_header] + out[start_of_full_trace:]
return out