# Copyright (c) 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import Queue import re import subprocess import sys import threading import time import zlib import systrace_agent import util # Text that ADB sends, but does not need to be displayed to the user. ADB_IGNORE_REGEXP = r'^capturing trace\.\.\. done|^capturing trace\.\.\.' # The number of seconds to wait on output from ADB. ADB_STDOUT_READ_TIMEOUT = 0.2 # The adb shell command to initiate a trace. ATRACE_BASE_ARGS = ['atrace'] # If a custom list of categories is not specified, traces will include # these categories (if available on the device). DEFAULT_CATEGORIES = 'sched gfx view dalvik webview input disk am wm'.split() # The command to list trace categories. LIST_CATEGORIES_ARGS = ATRACE_BASE_ARGS + ['--list_categories'] # Minimum number of seconds between displaying status updates. MIN_TIME_BETWEEN_STATUS_UPDATES = 0.2 # ADB sends this text to indicate the beginning of the trace data. TRACE_START_REGEXP = r'TRACE\:' # Plain-text trace data should always start with this string. TRACE_TEXT_HEADER = '# tracer' # This list is based on the tags in frameworks/native/include/utils/Trace.h for # legacy platform. LEGACY_TRACE_TAG_BITS = ( ('gfx', 1<<1), ('input', 1<<2), ('view', 1<<3), ('webview', 1<<4), ('wm', 1<<5), ('am', 1<<6), ('sm', 1<<7), ('audio', 1<<8), ('video', 1<<9), ('camera', 1<<10), ) def try_create_agent(options, categories): if options.from_file is not None: return AtraceAgent(options, categories) device_sdk_version = util.get_device_sdk_version() if device_sdk_version >= 18: return AtraceAgent(options, categories) elif device_sdk_version >= 16: return AtraceLegacyAgent(options, categories) class AtraceAgent(systrace_agent.SystraceAgent): def __init__(self, options, categories): super(AtraceAgent, self).__init__(options, categories) self._expect_trace = False self._adb = None self._trace_data = None def start(self): tracer_args = self._construct_trace_command() try: self._adb = subprocess.Popen(tracer_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as error: print >> sys.stderr, ( 'The command "%s" failed with the following error:' % ' '.join(tracer_args)) print >> sys.stderr, ' ', error sys.exit(1) def collect_result(self): trace_data = self._collect_trace_data(); if self._expect_trace: self._trace_data = self._preprocess_trace_data(trace_data); def expect_trace(self): return self._expect_trace def get_trace_data(self): return self._trace_data def get_class_name(self): return 'trace-data' def _construct_list_categories_command(self): return util.construct_adb_shell_command( LIST_CATEGORIES_ARGS, self._options.device_serial) def _construct_extra_trace_command(self): extra_args = [] if self._options.app_name is not None: extra_args.extend(['-a', self._options.app_name]) if self._options.kfuncs is not None: extra_args.extend(['-k', self._options.kfuncs]) if not self._categories: self._categories = get_default_categories(self._options.device_serial) extra_args.extend(self._categories) return extra_args def _construct_trace_command(self): """Builds a command-line used to invoke a trace process. Returns: A tuple where the first element is an array of command-line arguments, and the second element is a boolean which will be true if the commend will stream trace data. """ if self._options.list_categories: tracer_args = self._construct_list_categories_command() self._expect_trace = False elif self._options.from_file is not None: tracer_args = ['cat', self._options.from_file] self._expect_trace = True else: atrace_args = ATRACE_BASE_ARGS[:] self._expect_trace = True if self._options.compress_trace_data: atrace_args.extend(['-z']) if ((self._options.trace_time is not None) and (self._options.trace_time > 0)): atrace_args.extend(['-t', str(self._options.trace_time)]) if ((self._options.trace_buf_size is not None) and (self._options.trace_buf_size > 0)): atrace_args.extend(['-b', str(self._options.trace_buf_size)]) extra_args = self._construct_extra_trace_command() atrace_args.extend(extra_args) if self._options.fix_threads: atrace_args.extend([';', 'ps', '-t']) tracer_args = util.construct_adb_shell_command( atrace_args, self._options.device_serial) return tracer_args def _collect_trace_data(self): # Read the output from ADB in a worker thread. This allows us to monitor # the progress of ADB and bail if ADB becomes unresponsive for any reason. # Limit the stdout_queue to 128 entries because we will initially be reading # one byte at a time. When the queue fills up, the reader thread will # block until there is room in the queue. Once we start downloading the # trace data, we will switch to reading data in larger chunks, and 128 # entries should be plenty for that purpose. stdout_queue = Queue.Queue(maxsize=128) stderr_queue = Queue.Queue() if self._expect_trace: # Use stdout.write() (here and for the rest of this function) instead # of print() to avoid extra newlines. sys.stdout.write('Capturing trace...') # Use a chunk_size of 1 for stdout so we can display the output to # the user without waiting for a full line to be sent. stdout_thread = FileReaderThread(self._adb.stdout, stdout_queue, text_file=False, chunk_size=1) stderr_thread = FileReaderThread(self._adb.stderr, stderr_queue, text_file=True) stdout_thread.start() stderr_thread.start() # Holds the trace data returned by ADB. trace_data = [] # Keep track of the current line so we can find the TRACE_START_REGEXP. current_line = '' # Set to True once we've received the TRACE_START_REGEXP. reading_trace_data = False last_status_update_time = time.time() while (stdout_thread.isAlive() or stderr_thread.isAlive() or not stdout_queue.empty() or not stderr_queue.empty()): if self._expect_trace: last_status_update_time = status_update(last_status_update_time) while not stderr_queue.empty(): # Pass along errors from adb. line = stderr_queue.get() sys.stderr.write(line) # Read stdout from adb. The loop exits if we don't get any data for # ADB_STDOUT_READ_TIMEOUT seconds. while True: try: chunk = stdout_queue.get(True, ADB_STDOUT_READ_TIMEOUT) except Queue.Empty: # Didn't get any data, so exit the loop to check that ADB is still # alive and print anything sent to stderr. break if reading_trace_data: # Save, but don't print, the trace data. trace_data.append(chunk) else: if not self._expect_trace: sys.stdout.write(chunk) else: # Buffer the output from ADB so we can remove some strings that # don't need to be shown to the user. current_line += chunk if re.match(TRACE_START_REGEXP, current_line): # We are done capturing the trace. sys.stdout.write('Done.\n') # Now we start downloading the trace data. sys.stdout.write('Downloading trace...') current_line = '' # Use a larger chunk size for efficiency since we no longer # need to worry about parsing the stream. stdout_thread.set_chunk_size(4096) reading_trace_data = True elif chunk == '\n' or chunk == '\r': # Remove ADB output that we don't care about. current_line = re.sub(ADB_IGNORE_REGEXP, '', current_line) if len(current_line) > 1: # ADB printed something that we didn't understand, so show it # it to the user (might be helpful for debugging). sys.stdout.write(current_line) # Reset our current line. current_line = '' if self._expect_trace: if reading_trace_data: # Indicate to the user that the data download is complete. sys.stdout.write('Done.\n') else: # We didn't receive the trace start tag, so something went wrong. sys.stdout.write('ERROR.\n') # Show any buffered ADB output to the user. current_line = re.sub(ADB_IGNORE_REGEXP, '', current_line) if current_line: sys.stdout.write(current_line) sys.stdout.write('\n') # The threads should already have stopped, so this is just for cleanup. stdout_thread.join() stderr_thread.join() self._adb.stdout.close() self._adb.stderr.close() # The adb process should be done since it's io pipes are closed. Call # poll() to set the returncode. self._adb.poll() if self._adb.returncode != 0: print >> sys.stderr, ('The command "%s" returned error code %d.' % (' '.join(tracer_args), self._adb.returncode)) sys.exit(1) return trace_data def _preprocess_trace_data(self, trace_data): """Performs various processing on atrace data. Args: trace_data: The raw trace data. Returns: The processed trace data. """ trace_data = ''.join(trace_data) if self._options.fix_threads: # Extract the thread list dumped by ps. trace_data, thread_names = extract_thread_list(trace_data) if trace_data: trace_data = strip_and_decompress_trace(trace_data) if not trace_data: print >> sys.stderr, ('No data was captured. Output file was not ' 'written.') sys.exit(1) if self._options.fix_threads: trace_data = fix_thread_names(trace_data, thread_names) if self._options.fix_circular: trace_data = fix_circular_traces(trace_data) return trace_data class AtraceLegacyAgent(AtraceAgent): def _construct_list_categories_command(self): LEGACY_CATEGORIES = """ sched - CPU Scheduling freq - CPU Frequency idle - CPU Idle load - CPU Load disk - Disk I/O (requires root) bus - Bus utilization (requires root) workqueue - Kernel workqueues (requires root)""" return ["echo", LEGACY_CATEGORIES] def start(self): super(AtraceLegacyAgent, self).start() if self.expect_trace(): SHELL_ARGS = ['getprop', 'debug.atrace.tags.enableflags'] output, return_code = util.run_adb_shell(SHELL_ARGS, self._options.device_serial) flags = 0 if return_code == 0: try: if output.startswith('0x'): flags = int(output, 16) elif output.startswith('0'): flags = int(output, 8) else: flags = int(output) except ValueError, e: pass if flags: tags = [] for desc, bit in LEGACY_TRACE_TAG_BITS: if bit & flags: tags.append(desc) categories = tags + self._categories print 'Collecting data with following categories:', ' '.join(categories) def _construct_extra_trace_command(self): extra_args = [] if not self._categories: self._categories = ['sched', ] if 'sched' in self._categories: extra_args.append('-s') if 'freq' in self._categories: extra_args.append('-f') if 'idle' in self._categories: extra_args.append('-i') if 'load' in self._categories: extra_args.append('-l') if 'disk' in self._categories: extra_args.append('-d') if 'bus' in self._categories: extra_args.append('-u') if 'workqueue' in self._categories: extra_args.append('-w') return extra_args class FileReaderThread(threading.Thread): """Reads data from a file/pipe on a worker thread. Use the standard threading. Thread object API to start and interact with the thread (start(), join(), etc.). """ def __init__(self, file_object, output_queue, text_file, chunk_size=-1): """Initializes a FileReaderThread. Args: file_object: The file or pipe to read from. output_queue: A Queue.Queue object that will receive the data text_file: If True, the file will be read one line at a time, and chunk_size will be ignored. If False, line breaks are ignored and chunk_size must be set to a positive integer. chunk_size: When processing a non-text file (text_file = False), chunk_size is the amount of data to copy into the queue with each read operation. For text files, this parameter is ignored. """ threading.Thread.__init__(self) self._file_object = file_object self._output_queue = output_queue self._text_file = text_file self._chunk_size = chunk_size assert text_file or chunk_size > 0 def run(self): """Overrides Thread's run() function. Returns when an EOF is encountered. """ if self._text_file: # Read a text file one line at a time. for line in self._file_object: self._output_queue.put(line) else: # Read binary or text data until we get to EOF. while True: chunk = self._file_object.read(self._chunk_size) if not chunk: break self._output_queue.put(chunk) def set_chunk_size(self, chunk_size): """Change the read chunk size. This function can only be called if the FileReaderThread object was created with an initial chunk_size > 0. Args: chunk_size: the new chunk size for this file. Must be > 0. """ # The chunk size can be changed asynchronously while a file is being read # in a worker thread. However, type of file can not be changed after the # the FileReaderThread has been created. These asserts verify that we are # only changing the chunk size, and not the type of file. assert not self._text_file assert chunk_size > 0 self._chunk_size = chunk_size def get_default_categories(device_serial): categories_output, return_code = util.run_adb_shell(LIST_CATEGORIES_ARGS, device_serial) if return_code == 0 and categories_output: categories = [c.split('-')[0].strip() for c in categories_output.splitlines()] return [c for c in categories if c in DEFAULT_CATEGORIES] return [] def status_update(last_update_time): current_time = time.time() if (current_time - last_update_time) >= MIN_TIME_BETWEEN_STATUS_UPDATES: # Gathering a trace may take a while. Keep printing something so users # don't think the script has hung. sys.stdout.write('.') sys.stdout.flush() return current_time return last_update_time def extract_thread_list(trace_data): """Removes the thread list from the given trace data. Args: trace_data: The raw trace data (before decompression). Returns: A tuple containing the trace data and a map of thread ids to thread names. """ threads = {} parts = re.split('USER +PID +PPID +VSIZE +RSS +WCHAN +PC +NAME', trace_data, 1) if len(parts) == 2: trace_data = parts[0] for line in parts[1].splitlines(): cols = line.split(None, 8) if len(cols) == 9: tid = int(cols[1]) name = cols[8] threads[tid] = name return (trace_data, threads) def strip_and_decompress_trace(trace_data): """Fixes new-lines and decompresses trace data. Args: trace_data: The trace data returned by atrace. Returns: The decompressed trace data. """ # Collapse CRLFs that are added by adb shell. if trace_data.startswith('\r\n'): trace_data = trace_data.replace('\r\n', '\n') elif trace_data.startswith('\r\r\n'): # On windows, adb adds an extra '\r' character for each line. trace_data = trace_data.replace('\r\r\n', '\n') # Skip the initial newline. trace_data = trace_data[1:] if not trace_data.startswith(TRACE_TEXT_HEADER): # No header found, so assume the data is compressed. trace_data = zlib.decompress(trace_data) # Enforce Unix line-endings. trace_data = trace_data.replace('\r', '') # Skip any initial newlines. while trace_data and trace_data[0] == '\n': trace_data = trace_data[1:] return trace_data def fix_thread_names(trace_data, thread_names): """Replaces thread ids with their names. Args: trace_data: The atrace data. thread_names: A mapping of thread ids to thread names. Returns: The updated trace data. """ def repl(m): tid = int(m.group(2)) if tid > 0: name = thread_names.get(tid) if name is None: name = m.group(1) if name == '<...>': name = '<' + str(tid) + '>' thread_names[tid] = name return name + '-' + m.group(2) else: return m.group(0) trace_data = re.sub(r'^\s*(\S+)-(\d+)', repl, trace_data, flags=re.MULTILINE) return trace_data def fix_circular_traces(out): """Fix inconsistentcies in traces due to circular buffering. The circular buffers are kept per CPU, so it is not guaranteed that the beginning of a slice is overwritten before the end. To work around this, we throw away the prefix of the trace where not all CPUs have events yet. Args: out: The data to fix. Returns: The updated trace data. """ # If any of the CPU's buffers have filled up and # older events have been dropped, the kernel # emits markers of the form '##### CPU 2 buffer started ####' on # the line before the first event in the trace on that CPU. # # No such headers are emitted if there were no overflows or the trace # was captured with non-circular buffers. buffer_start_re = re.compile(r'^#+ CPU \d+ buffer started', re.MULTILINE) start_of_full_trace = 0 while True: result = buffer_start_re.search(out, start_of_full_trace + 1) if result: start_of_full_trace = result.start() else: break if start_of_full_trace > 0: # Need to keep the header intact to make the importer happy. end_of_header = re.search(r'^[^#]', out, re.MULTILINE).start() out = out[:end_of_header] + out[start_of_full_trace:] return out