普通文本  |  387行  |  15.12 KB

# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


import json
import os
import time

import selenium

from autotest_lib.client.bin import utils
from extension_pages import e2e_test_utils
from extension_pages import options


class TestUtils(object):
    """Contains all the helper functions for Chrome mirroring automation."""

    short_wait_secs = 3
    step_timeout_secs = 60
    cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq',
                  'softirq', 'steal', 'guest', 'guest_nice']
    cpu_idle_fields = ['idle', 'iowait']

    def __init__(self):
        """Constructor"""


    def set_mirroring_options(self, driver, extension_id, settings):
        """Apply all the settings given by the user to the option page.

        @param driver: The chromedriver instance of the test
        @param extension_id: The id of the Cast extension
        @param settings: The settings and information about the test
        """
        options_page = options.OptionsPage(driver, extension_id)
        options_page.open_hidden_options_menu()
        time.sleep(self.short_wait_secs)
        for key in settings.keys():
            options_page.set_value(key, settings[key])


    def start_v2_mirroring_test_utils(
            self, driver, extension_id, receiver_ip, url, fullscreen,
            udp_proxy_server=None, network_profile=None):
        """Use test util page to start mirroring session on specific device.

        @param driver: The chromedriver instance
        @param extension_id: The id of the Cast extension
        @param receiver_ip: The ip of the Eureka dongle to launch the activity
        @param url: The URL to navigate to
        @param fullscreen: click the fullscreen button or not
        @param udp_proxy_server: the address of udp proxy server,
            it should be a http address, http://<ip>:<port>
        @param network_profile: the network profile,
            it should be one of wifi, bad and evil.
        @return True if the function finishes
        """
        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
                driver, extension_id)
        time.sleep(self.short_wait_secs)
        tab_handles = driver.window_handles
        e2e_test_utils_page.receiver_ip_or_name_v2_text_box().set_value(
                receiver_ip)
        e2e_test_utils_page.url_to_open_v2_text_box().set_value(url)
        if udp_proxy_server:
            e2e_test_utils_page.udp_proxy_server_text_box().set_value(
                    udp_proxy_server)
        if network_profile:
            e2e_test_utils_page.network_profile_text_box().set_value(
                    network_profile)
        time.sleep(self.short_wait_secs)
        e2e_test_utils_page.open_then_mirror_v2_button().click()
        time.sleep(self.short_wait_secs)
        all_handles = driver.window_handles
        video_handle = [x for x in all_handles if x not in tab_handles].pop()
        driver.switch_to_window(video_handle)
        self.navigate_to_test_url(driver, url, fullscreen)
        return True


    def stop_v2_mirroring_test_utils(self, driver, extension_id):
      """Use test util page to stop a mirroring session on a specific device.

      @param driver: The chromedriver instance
      @param extension_id: The id of the Cast extension
      @param activity_id: The id of the mirroring activity
      """
      e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(driver,
                                                            extension_id)
      e2e_test_utils_page.go_to_page()
      time.sleep(self.short_wait_secs)
      e2e_test_utils_page.stop_v2_mirroring_button().click()


    def start_v2_mirroring_sdk(self, driver, device_ip, url, extension_id):
        """Use SDK to start a mirroring session on a specific device.

        @param driver: The chromedriver instance
        @param device_ip: The IP of the Eureka device
        @param url: The URL to navigate to
        @param extension_id: The id of the Cast extension
        @return True if the function finishes
        @raise RuntimeError for timeouts
        """
        self.set_auto_testing_ip(driver, extension_id, device_ip)
        self.nagviate(driver, url, False)
        time.sleep(self.short_wait_secs)
        driver.execute_script('loadScript()')
        self._wait_for_result(
                lambda: driver.execute_script('return isSuccessful'),
                'Timeout when initiating mirroring... ...')
        driver.execute_script('startV2Mirroring()')
        self._wait_for_result(
                lambda: driver.execute_script('return isSuccessful'),
                'Timeout when triggering mirroring... ...')
        return True


    def stop_v2_mirroring_sdk(self, driver, activity_id=None):
        """Use SDK to stop the mirroring activity in Chrome.

        @param driver: The chromedriver instance
        @param activity_id: The id of the mirroring activity
        @raise RuntimeError for timeouts
        """
        driver.execute_script('stopV2Mirroring()')
        self._wait_for_result(
                lambda: driver.execute_script('return isSuccessful'),
                self.step_timeout_secs)


    def set_auto_testing_ip(self, driver, extension_id, device_ip):
        """Set the auto testing IP on the extension page.

        @param driver: The chromedriver instance
        @param extension_id: The id of the Cast extension
        @param device_ip: The IP of the device to test against
        """
        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
                driver, extension_id)
        e2e_test_utils_page.execute_script(
                'localStorage["AutoTestingIp"] = "%s";' % device_ip)


    def upload_v2_mirroring_logs(self, driver, extension_id):
        """Upload v2 mirroring logs for the latest mirroring session.

        @param driver: The chromedriver instance of the browser
        @param extension_id: The extension ID of the Cast extension
        @return The report id in crash staging server.
        @raises RuntimeError if an error occurred during uploading
        """
        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
                driver, extension_id)
        e2e_test_utils_page.go_to_page()
        time.sleep(self.short_wait_secs)
        e2e_test_utils_page.upload_v2_mirroring_logs_button().click()
        report_id = self._wait_for_result(
            e2e_test_utils_page.v2_mirroring_logs_scroll_box().get_value,
            'Failed to get v2 mirroring logs')
        if 'Failed to upload logs' in report_id:
          raise RuntimeError('Failed to get v2 mirroring logs')
        return report_id


    def get_chrome_version(self, driver):
        """Return the Chrome version that is being used for running test.

        @param driver: The chromedriver instance
        @return The Chrome version
        """
        get_chrome_version_js = 'return window.navigator.appVersion;'
        app_version = driver.execute_script(get_chrome_version_js)
        for item in app_version.split():
           if 'Chrome/' in item:
              return item.split('/')[1]
        return None


    def get_chrome_revision(self, driver):
        """Return Chrome revision number that is being used for running test.

        @param driver: The chromedriver instance
        @return The Chrome revision number
        """
        get_chrome_revision_js = ('return document.getElementById("version").'
                                  'getElementsByTagName("span")[2].innerHTML;')
        driver.get('chrome://version')
        return driver.execute_script(get_chrome_revision_js)


    def get_extension_id_from_flag(self, extra_flags):
        """Gets the extension ID based on the whitelisted extension id flag.

        @param extra_flags: A string which contains all the extra chrome flags
        @return The ID of the extension. Return None if nothing is found.
        """
        extra_flags_list = extra_flags.split()
        for flag in extra_flags_list:
            if 'whitelisted-extension-id=' in flag:
                return flag.split('=')[1]
        return None


    def navigate_to_test_url(self, driver, url, fullscreen):
        """Navigate to a given URL. Click fullscreen button if needed.

        @param driver: The chromedriver instance
        @param url: The URL of the site to navigate to
        @param fullscreen: True and the video will play in full screen mode.
                           Otherwise, set to False
        """
        driver.get(url)
        driver.refresh()
        if fullscreen:
          self.request_full_screen(driver)


    def request_full_screen(self, driver):
        """Request full screen.

        @param driver: The chromedriver instance
        """
        try:
            time.sleep(self.short_wait_secs)
            driver.find_element_by_id('fsbutton').click()
        except selenium.common.exceptions.NoSuchElementException as error_message:
            print 'Full screen button is not found. ' + str(error_message)


    def set_focus_tab(self, driver, tab_handle):
      """Set the focus on a tab.

      @param driver: The chromedriver instance
      @param tab_handle: The chrome driver handle of the tab
      """
      driver.switch_to_window(tab_handle)
      driver.get_screenshot_as_base64()


    def block_setup_dialog(self, driver, extension_id):
        """Tab cast through the extension.

        @param driver: A chromedriver instance that has the extension loaded.
        @param extension_id: Id of the extension to use.
        """
        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
                driver, extension_id)
        e2e_test_utils_page.go_to_page()
        time.sleep(self.short_wait_secs)
        driver.execute_script(
            'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"')


    def close_popup_tabs(self, driver):
        """Close any popup windows the extension might open by default.

        Since we're going to handle the extension ourselves all we need is
        the main browser window with a single tab. The safest way to handle
        the popup however, is to close the currently active tab, so we don't
        mess with chromedrivers ui debugger.

        @param driver: Chromedriver instance.
        @raises Exception If you close the tab associated with
            the ui debugger.
        """
        # TODO: There are several, albeit hacky ways, to handle this popup
        # that might need to change with different versions of the extension
        # until the core issue is resolved. See crbug.com/338399.
        current_tab_handle = driver.current_window_handle
        for handle in driver.window_handles:
            if current_tab_handle != handle:
                try:
                    time.sleep(self.short_wait_secs)
                    driver.switch_to_window(handle)
                    driver.close()
                except:
                    pass
        driver.switch_to_window(current_tab_handle)


    def output_dict_to_file(self, dictionary, file_name,
                            path=None, sort_keys=False):
        """Output a dictionary into a file.

        @param dictionary: The dictionary to be output as JSON
        @param file_name: The name of the file that is being output
        @param path: The path of the file. The default is None
        @param sort_keys: Sort dictionary by keys when output. False by default
        """
        if path is None:
            path = os.path.abspath(os.path.dirname(__file__))
        # if json file exists, read the existing one and append to it
        json_file = os.path.join(path, file_name)
        if os.path.isfile(json_file) and dictionary:
            with open(json_file, 'r') as existing_json_data:
                json_data = json.load(existing_json_data)
            dictionary = dict(json_data.items() + dictionary.items())
        output_json = json.dumps(dictionary, sort_keys=sort_keys)
        with open(json_file, 'w') as file_handler:
            file_handler.write(output_json)


    def compute_cpu_utilization(self, cpu_dict):
        """Generate the upper/lower bound and the average CPU consumption.

        @param cpu_dict: The dictionary that contains CPU usage every sec.
        @returns A dict that contains upper/lower bound and average cpu usage.
        """
        cpu_bound = {}
        cpu_usage = sorted(cpu_dict.values())
        cpu_bound['lower_bound'] = (
                '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)])
        cpu_bound['upper_bound'] = (
                '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)])
        cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage)))
        return cpu_bound


    def cpu_usage_interval(self, duration, interval=1):
        """Get the CPU usage over a period of time based on interval.

        @param duration: The duration of getting the CPU usage.
        @param interval: The interval to check the CPU usage. Default is 1 sec.
        @return A dict that contains CPU usage over different time intervals.
        """
        current_time = 0
        cpu_usage = {}
        while current_time < duration:
            pre_times = self._get_system_times()
            time.sleep(interval)
            post_times = self._get_system_times()
            cpu_usage[current_time] = self._get_avg_cpu_usage(
                    pre_times, post_times)
            current_time += interval
        return cpu_usage


    def _get_avg_cpu_usage(self, pre_times, post_times):
        """Calculate the average CPU usage of two different periods of time.

        @param pre_times: The CPU usage information of the start point.
        @param post_times: The CPU usage information of the end point.
        @return Average CPU usage over a time period.
        """
        diff_times = {}
        for field in self.cpu_fields:
            diff_times[field] = post_times[field] - pre_times[field]

        idle_time = sum(diff_times[field] for field in self.cpu_idle_fields)
        total_time = sum(diff_times[field] for field in self.cpu_fields)
        return float(total_time - idle_time) / total_time * 100.0


    def _get_system_times(self):
        """Get the CPU information from the system times.

        @return An list with CPU usage of different processes.
        """
        proc_stat = utils.read_file('/proc/stat')
        for line in proc_stat.split('\n'):
            if line.startswith('cpu '):
                times = line[4:].strip().split(' ')
                times = [int(jiffies) for jiffies in times]
                return dict(zip(self.cpu_fields, times))


    def _wait_for_result(self, get_result, error_message):
        """Wait for the result.

        @param get_result: the function to get result.
        @param error_message: the error message in the exception
            if it is failed to get result.
        @return The result.
        @raises RuntimeError if it is failed to get result within
            self.step_timeout_secs.
        """
        start = time.time()
        while (((time.time() - start) < self.step_timeout_secs)
               and not get_result()):
            time.sleep(self.step_timeout_secs/10.0)
        if not get_result():
            raise RuntimeError(error_message)
        return get_result()