# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import os
import time
import selenium
from autotest_lib.client.bin import utils
from extension_pages import e2e_test_utils
from extension_pages import options
class TestUtils(object):
"""Contains all the helper functions for Chrome mirroring automation."""
short_wait_secs = 3
step_timeout_secs = 60
cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq',
'softirq', 'steal', 'guest', 'guest_nice']
cpu_idle_fields = ['idle', 'iowait']
def __init__(self):
"""Constructor"""
def set_mirroring_options(self, driver, extension_id, settings):
"""Apply all the settings given by the user to the option page.
@param driver: The chromedriver instance of the test
@param extension_id: The id of the Cast extension
@param settings: The settings and information about the test
"""
options_page = options.OptionsPage(driver, extension_id)
options_page.open_hidden_options_menu()
time.sleep(self.short_wait_secs)
for key in settings.keys():
options_page.set_value(key, settings[key])
def start_v2_mirroring_test_utils(
self, driver, extension_id, receiver_ip, url, fullscreen,
udp_proxy_server=None, network_profile=None):
"""Use test util page to start mirroring session on specific device.
@param driver: The chromedriver instance
@param extension_id: The id of the Cast extension
@param receiver_ip: The ip of the Eureka dongle to launch the activity
@param url: The URL to navigate to
@param fullscreen: click the fullscreen button or not
@param udp_proxy_server: the address of udp proxy server,
it should be a http address, http://<ip>:<port>
@param network_profile: the network profile,
it should be one of wifi, bad and evil.
@return True if the function finishes
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
time.sleep(self.short_wait_secs)
tab_handles = driver.window_handles
e2e_test_utils_page.receiver_ip_or_name_v2_text_box().set_value(
receiver_ip)
e2e_test_utils_page.url_to_open_v2_text_box().set_value(url)
if udp_proxy_server:
e2e_test_utils_page.udp_proxy_server_text_box().set_value(
udp_proxy_server)
if network_profile:
e2e_test_utils_page.network_profile_text_box().set_value(
network_profile)
time.sleep(self.short_wait_secs)
e2e_test_utils_page.open_then_mirror_v2_button().click()
time.sleep(self.short_wait_secs)
all_handles = driver.window_handles
video_handle = [x for x in all_handles if x not in tab_handles].pop()
driver.switch_to_window(video_handle)
self.navigate_to_test_url(driver, url, fullscreen)
return True
def stop_v2_mirroring_test_utils(self, driver, extension_id):
"""Use test util page to stop a mirroring session on a specific device.
@param driver: The chromedriver instance
@param extension_id: The id of the Cast extension
@param activity_id: The id of the mirroring activity
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(driver,
extension_id)
e2e_test_utils_page.go_to_page()
time.sleep(self.short_wait_secs)
e2e_test_utils_page.stop_v2_mirroring_button().click()
def start_v2_mirroring_sdk(self, driver, device_ip, url, extension_id):
"""Use SDK to start a mirroring session on a specific device.
@param driver: The chromedriver instance
@param device_ip: The IP of the Eureka device
@param url: The URL to navigate to
@param extension_id: The id of the Cast extension
@return True if the function finishes
@raise RuntimeError for timeouts
"""
self.set_auto_testing_ip(driver, extension_id, device_ip)
self.nagviate(driver, url, False)
time.sleep(self.short_wait_secs)
driver.execute_script('loadScript()')
self._wait_for_result(
lambda: driver.execute_script('return isSuccessful'),
'Timeout when initiating mirroring... ...')
driver.execute_script('startV2Mirroring()')
self._wait_for_result(
lambda: driver.execute_script('return isSuccessful'),
'Timeout when triggering mirroring... ...')
return True
def stop_v2_mirroring_sdk(self, driver, activity_id=None):
"""Use SDK to stop the mirroring activity in Chrome.
@param driver: The chromedriver instance
@param activity_id: The id of the mirroring activity
@raise RuntimeError for timeouts
"""
driver.execute_script('stopV2Mirroring()')
self._wait_for_result(
lambda: driver.execute_script('return isSuccessful'),
self.step_timeout_secs)
def set_auto_testing_ip(self, driver, extension_id, device_ip):
"""Set the auto testing IP on the extension page.
@param driver: The chromedriver instance
@param extension_id: The id of the Cast extension
@param device_ip: The IP of the device to test against
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.execute_script(
'localStorage["AutoTestingIp"] = "%s";' % device_ip)
def upload_v2_mirroring_logs(self, driver, extension_id):
"""Upload v2 mirroring logs for the latest mirroring session.
@param driver: The chromedriver instance of the browser
@param extension_id: The extension ID of the Cast extension
@return The report id in crash staging server.
@raises RuntimeError if an error occurred during uploading
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.go_to_page()
time.sleep(self.short_wait_secs)
e2e_test_utils_page.upload_v2_mirroring_logs_button().click()
report_id = self._wait_for_result(
e2e_test_utils_page.v2_mirroring_logs_scroll_box().get_value,
'Failed to get v2 mirroring logs')
if 'Failed to upload logs' in report_id:
raise RuntimeError('Failed to get v2 mirroring logs')
return report_id
def get_chrome_version(self, driver):
"""Return the Chrome version that is being used for running test.
@param driver: The chromedriver instance
@return The Chrome version
"""
get_chrome_version_js = 'return window.navigator.appVersion;'
app_version = driver.execute_script(get_chrome_version_js)
for item in app_version.split():
if 'Chrome/' in item:
return item.split('/')[1]
return None
def get_chrome_revision(self, driver):
"""Return Chrome revision number that is being used for running test.
@param driver: The chromedriver instance
@return The Chrome revision number
"""
get_chrome_revision_js = ('return document.getElementById("version").'
'getElementsByTagName("span")[2].innerHTML;')
driver.get('chrome://version')
return driver.execute_script(get_chrome_revision_js)
def get_extension_id_from_flag(self, extra_flags):
"""Gets the extension ID based on the whitelisted extension id flag.
@param extra_flags: A string which contains all the extra chrome flags
@return The ID of the extension. Return None if nothing is found.
"""
extra_flags_list = extra_flags.split()
for flag in extra_flags_list:
if 'whitelisted-extension-id=' in flag:
return flag.split('=')[1]
return None
def navigate_to_test_url(self, driver, url, fullscreen):
"""Navigate to a given URL. Click fullscreen button if needed.
@param driver: The chromedriver instance
@param url: The URL of the site to navigate to
@param fullscreen: True and the video will play in full screen mode.
Otherwise, set to False
"""
driver.get(url)
driver.refresh()
if fullscreen:
self.request_full_screen(driver)
def request_full_screen(self, driver):
"""Request full screen.
@param driver: The chromedriver instance
"""
try:
time.sleep(self.short_wait_secs)
driver.find_element_by_id('fsbutton').click()
except selenium.common.exceptions.NoSuchElementException as error_message:
print 'Full screen button is not found. ' + str(error_message)
def set_focus_tab(self, driver, tab_handle):
"""Set the focus on a tab.
@param driver: The chromedriver instance
@param tab_handle: The chrome driver handle of the tab
"""
driver.switch_to_window(tab_handle)
driver.get_screenshot_as_base64()
def block_setup_dialog(self, driver, extension_id):
"""Tab cast through the extension.
@param driver: A chromedriver instance that has the extension loaded.
@param extension_id: Id of the extension to use.
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.go_to_page()
time.sleep(self.short_wait_secs)
driver.execute_script(
'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"')
def close_popup_tabs(self, driver):
"""Close any popup windows the extension might open by default.
Since we're going to handle the extension ourselves all we need is
the main browser window with a single tab. The safest way to handle
the popup however, is to close the currently active tab, so we don't
mess with chromedrivers ui debugger.
@param driver: Chromedriver instance.
@raises Exception If you close the tab associated with
the ui debugger.
"""
# TODO: There are several, albeit hacky ways, to handle this popup
# that might need to change with different versions of the extension
# until the core issue is resolved. See crbug.com/338399.
current_tab_handle = driver.current_window_handle
for handle in driver.window_handles:
if current_tab_handle != handle:
try:
time.sleep(self.short_wait_secs)
driver.switch_to_window(handle)
driver.close()
except:
pass
driver.switch_to_window(current_tab_handle)
def output_dict_to_file(self, dictionary, file_name,
path=None, sort_keys=False):
"""Output a dictionary into a file.
@param dictionary: The dictionary to be output as JSON
@param file_name: The name of the file that is being output
@param path: The path of the file. The default is None
@param sort_keys: Sort dictionary by keys when output. False by default
"""
if path is None:
path = os.path.abspath(os.path.dirname(__file__))
# if json file exists, read the existing one and append to it
json_file = os.path.join(path, file_name)
if os.path.isfile(json_file) and dictionary:
with open(json_file, 'r') as existing_json_data:
json_data = json.load(existing_json_data)
dictionary = dict(json_data.items() + dictionary.items())
output_json = json.dumps(dictionary, sort_keys=sort_keys)
with open(json_file, 'w') as file_handler:
file_handler.write(output_json)
def compute_cpu_utilization(self, cpu_dict):
"""Generate the upper/lower bound and the average CPU consumption.
@param cpu_dict: The dictionary that contains CPU usage every sec.
@returns A dict that contains upper/lower bound and average cpu usage.
"""
cpu_bound = {}
cpu_usage = sorted(cpu_dict.values())
cpu_bound['lower_bound'] = (
'%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)])
cpu_bound['upper_bound'] = (
'%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)])
cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage)))
return cpu_bound
def cpu_usage_interval(self, duration, interval=1):
"""Get the CPU usage over a period of time based on interval.
@param duration: The duration of getting the CPU usage.
@param interval: The interval to check the CPU usage. Default is 1 sec.
@return A dict that contains CPU usage over different time intervals.
"""
current_time = 0
cpu_usage = {}
while current_time < duration:
pre_times = self._get_system_times()
time.sleep(interval)
post_times = self._get_system_times()
cpu_usage[current_time] = self._get_avg_cpu_usage(
pre_times, post_times)
current_time += interval
return cpu_usage
def _get_avg_cpu_usage(self, pre_times, post_times):
"""Calculate the average CPU usage of two different periods of time.
@param pre_times: The CPU usage information of the start point.
@param post_times: The CPU usage information of the end point.
@return Average CPU usage over a time period.
"""
diff_times = {}
for field in self.cpu_fields:
diff_times[field] = post_times[field] - pre_times[field]
idle_time = sum(diff_times[field] for field in self.cpu_idle_fields)
total_time = sum(diff_times[field] for field in self.cpu_fields)
return float(total_time - idle_time) / total_time * 100.0
def _get_system_times(self):
"""Get the CPU information from the system times.
@return An list with CPU usage of different processes.
"""
proc_stat = utils.read_file('/proc/stat')
for line in proc_stat.split('\n'):
if line.startswith('cpu '):
times = line[4:].strip().split(' ')
times = [int(jiffies) for jiffies in times]
return dict(zip(self.cpu_fields, times))
def _wait_for_result(self, get_result, error_message):
"""Wait for the result.
@param get_result: the function to get result.
@param error_message: the error message in the exception
if it is failed to get result.
@return The result.
@raises RuntimeError if it is failed to get result within
self.step_timeout_secs.
"""
start = time.time()
while (((time.time() - start) < self.step_timeout_secs)
and not get_result()):
time.sleep(self.step_timeout_secs/10.0)
if not get_result():
raise RuntimeError(error_message)
return get_result()