# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Facade to access the display-related functionality."""
import multiprocessing
import numpy
import os
import re
import time
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import retry
from autotest_lib.client.cros import constants, sys_power
from autotest_lib.client.cros.graphics import graphics_utils
from autotest_lib.client.cros.multimedia import facade_resource
from autotest_lib.client.cros.multimedia import image_generator
from telemetry.internal.browser import web_contents
class TimeoutException(Exception):
"""Timeout Exception class."""
pass
_FLAKY_CALL_RETRY_TIMEOUT_SEC = 60
_FLAKY_DISPLAY_CALL_RETRY_DELAY_SEC = 2
_retry_display_call = retry.retry(
(KeyError, error.CmdError),
timeout_min=_FLAKY_CALL_RETRY_TIMEOUT_SEC / 60.0,
delay_sec=_FLAKY_DISPLAY_CALL_RETRY_DELAY_SEC)
class DisplayFacadeNative(object):
"""Facade to access the display-related functionality.
The methods inside this class only accept Python native types.
"""
CALIBRATION_IMAGE_PATH = '/tmp/calibration.svg'
MINIMUM_REFRESH_RATE_EXPECTED = 25.0
DELAY_TIME = 3
def __init__(self, resource):
"""Initializes a DisplayFacadeNative.
@param resource: A FacadeResource object.
"""
self._resource = resource
self._image_generator = image_generator.ImageGenerator()
@facade_resource.retry_chrome_call
def get_display_info(self):
"""Gets the display info from Chrome.system.display API.
@return array of dict for display info.
"""
extension = self._resource.get_extension(
constants.MULTIMEDIA_TEST_EXTENSION)
extension.ExecuteJavaScript('window.__display_info = null;')
extension.ExecuteJavaScript(
"chrome.system.display.getInfo(function(info) {"
"window.__display_info = info;})")
utils.wait_for_value(lambda: (
extension.EvaluateJavaScript("window.__display_info") != None),
expected_value=True)
return extension.EvaluateJavaScript("window.__display_info")
@facade_resource.retry_chrome_call
def get_window_info(self):
"""Gets the current window info from Chrome.system.window API.
@return a dict for the information of the current window.
"""
extension = self._resource.get_extension()
extension.ExecuteJavaScript('window.__window_info = null;')
extension.ExecuteJavaScript(
"chrome.windows.getCurrent(function(info) {"
"window.__window_info = info;})")
utils.wait_for_value(lambda: (
extension.EvaluateJavaScript("window.__window_info") != None),
expected_value=True)
return extension.EvaluateJavaScript("window.__window_info")
def _wait_for_display_options_to_appear(self, tab, display_index,
timeout=16):
"""Waits for option.DisplayOptions to appear.
The function waits until options.DisplayOptions appears or is timed out
after the specified time.
@param tab: the tab where the display options dialog is shown.
@param display_index: index of the display.
@param timeout: time wait for display options appear.
@raise RuntimeError when display_index is out of range
@raise TimeoutException when the operation is timed out.
"""
tab.WaitForJavaScriptExpression(
"typeof options !== 'undefined' &&"
"typeof options.DisplayOptions !== 'undefined' &&"
"typeof options.DisplayOptions.instance_ !== 'undefined' &&"
"typeof options.DisplayOptions.instance_"
" .displays_ !== 'undefined'", timeout)
if not tab.EvaluateJavaScript(
"options.DisplayOptions.instance_.displays_.length > %d"
% (display_index)):
raise RuntimeError('Display index out of range: '
+ str(tab.EvaluateJavaScript(
"options.DisplayOptions.instance_.displays_.length")))
tab.WaitForJavaScriptExpression(
"typeof options.DisplayOptions.instance_"
" .displays_[%(index)d] !== 'undefined' &&"
"typeof options.DisplayOptions.instance_"
" .displays_[%(index)d].id !== 'undefined' &&"
"typeof options.DisplayOptions.instance_"
" .displays_[%(index)d].resolutions !== 'undefined'"
% {'index': display_index}, timeout)
def get_display_modes(self, display_index):
"""Gets all the display modes for the specified display.
The modes are obtained from chrome://settings-frame/display via
telemetry.
@param display_index: index of the display to get modes from.
@return: A list of DisplayMode dicts.
@raise TimeoutException when the operation is timed out.
"""
try:
tab_descriptor = self.load_url('chrome://settings-frame/display')
tab = self._resource.get_tab_by_descriptor(tab_descriptor)
self._wait_for_display_options_to_appear(tab, display_index)
return tab.EvaluateJavaScript(
"options.DisplayOptions.instance_"
" .displays_[%(index)d].resolutions"
% {'index': display_index})
finally:
self.close_tab(tab_descriptor)
def get_available_resolutions(self, display_index):
"""Gets the resolutions from the specified display.
@return a list of (width, height) tuples.
"""
# Start from M38 (refer to http://codereview.chromium.org/417113012),
# a DisplayMode dict contains 'originalWidth'/'originalHeight'
# in addition to 'width'/'height'.
# OriginalWidth/originalHeight is what is supported by the display
# while width/height is what is shown to users in the display setting.
modes = self.get_display_modes(display_index)
if modes:
if 'originalWidth' in modes[0]:
# M38 or newer
# TODO(tingyuan): fix loading image for cases where original
# width/height is different from width/height.
return list(set([(mode['originalWidth'], mode['originalHeight'])
for mode in modes]))
# pre-M38
return [(mode['width'], mode['height']) for mode in modes
if 'scale' not in mode]
def get_first_external_display_index(self):
"""Gets the first external display index.
@return the index of the first external display; False if not found.
"""
# Get the first external and enabled display
for index, display in enumerate(self.get_display_info()):
if display['isEnabled'] and not display['isInternal']:
return index
return False
def set_resolution(self, display_index, width, height, timeout=3):
"""Sets the resolution of the specified display.
@param display_index: index of the display to set resolution for.
@param width: width of the resolution
@param height: height of the resolution
@param timeout: maximal time in seconds waiting for the new resolution
to settle in.
@raise TimeoutException when the operation is timed out.
"""
try:
tab_descriptor = self.load_url('chrome://settings-frame/display')
tab = self._resource.get_tab_by_descriptor(tab_descriptor)
self._wait_for_display_options_to_appear(tab, display_index)
tab.ExecuteJavaScript(
# Start from M38 (refer to CR:417113012), a DisplayMode dict
# contains 'originalWidth'/'originalHeight' in addition to
# 'width'/'height'. OriginalWidth/originalHeight is what is
# supported by the display while width/height is what is
# shown to users in the display setting.
"""
var display = options.DisplayOptions.instance_
.displays_[%(index)d];
var modes = display.resolutions;
for (index in modes) {
var mode = modes[index];
if (mode.originalWidth == %(width)d &&
mode.originalHeight == %(height)d) {
chrome.send('setDisplayMode', [display.id, mode]);
break;
}
}
"""
% {'index': display_index, 'width': width, 'height': height}
)
def _get_selected_resolution():
modes = tab.EvaluateJavaScript(
"""
options.DisplayOptions.instance_
.displays_[%(index)d].resolutions
"""
% {'index': display_index})
for mode in modes:
if mode['selected']:
return (mode['originalWidth'], mode['originalHeight'])
# TODO(tingyuan):
# Support for multiple external monitors (i.e. for chromebox)
end_time = time.time() + timeout
while time.time() < end_time:
r = _get_selected_resolution()
if (width, height) == (r[0], r[1]):
return True
time.sleep(0.1)
raise TimeoutException('Failed to change resolution to %r (%r'
' detected)' % ((width, height), r))
finally:
self.close_tab(tab_descriptor)
@_retry_display_call
def get_external_resolution(self):
"""Gets the resolution of the external screen.
@return The resolution tuple (width, height)
"""
return graphics_utils.get_external_resolution()
def get_internal_resolution(self):
"""Gets the resolution of the internal screen.
@return The resolution tuple (width, height) or None if internal screen
is not available
"""
for display in self.get_display_info():
if display['isInternal']:
bounds = display['bounds']
return (bounds['width'], bounds['height'])
return None
def set_content_protection(self, state):
"""Sets the content protection of the external screen.
@param state: One of the states 'Undesired', 'Desired', or 'Enabled'
"""
connector = self.get_external_connector_name()
graphics_utils.set_content_protection(connector, state)
def get_content_protection(self):
"""Gets the state of the content protection.
@param output: The output name as a string.
@return: A string of the state, like 'Undesired', 'Desired', or 'Enabled'.
False if not supported.
"""
connector = self.get_external_connector_name()
return graphics_utils.get_content_protection(connector)
def get_external_crtc(self):
"""Gets the external crtc.
@return The id of the external crtc."""
return graphics_utils.get_external_crtc()
def get_internal_crtc(self):
"""Gets the internal crtc.
@retrun The id of the internal crtc."""
return graphics_utils.get_internal_crtc()
def get_output_rect(self, output):
"""Gets the size and position of the given output on the screen buffer.
@param output: The output name as a string.
@return A tuple of the rectangle (width, height, fb_offset_x,
fb_offset_y) of ints.
"""
regexp = re.compile(
r'^([-A-Za-z0-9]+)\s+connected\s+(\d+)x(\d+)\+(\d+)\+(\d+)',
re.M)
match = regexp.findall(graphics_utils.call_xrandr())
for m in match:
if m[0] == output:
return (int(m[1]), int(m[2]), int(m[3]), int(m[4]))
return (0, 0, 0, 0)
def take_internal_screenshot(self, path):
"""Takes internal screenshot.
@param path: path to image file.
"""
if utils.is_freon():
self.take_screenshot_crtc(path, self.get_internal_crtc())
else:
output = self.get_internal_connector_name()
box = self.get_output_rect(output)
graphics_utils.take_screenshot_crop_x(path, box)
return output, box # for logging/debugging
def take_external_screenshot(self, path):
"""Takes external screenshot.
@param path: path to image file.
"""
if utils.is_freon():
self.take_screenshot_crtc(path, self.get_external_crtc())
else:
output = self.get_external_connector_name()
box = self.get_output_rect(output)
graphics_utils.take_screenshot_crop_x(path, box)
return output, box # for logging/debugging
def take_screenshot_crtc(self, path, id):
"""Captures the DUT screenshot, use id for selecting screen.
@param path: path to image file.
@param id: The id of the crtc to screenshot.
"""
graphics_utils.take_screenshot_crop(path, crtc_id=id)
return True
def take_tab_screenshot(self, output_path, url_pattern=None):
"""Takes a screenshot of the tab specified by the given url pattern.
@param output_path: A path of the output file.
@param url_pattern: A string of url pattern used to search for tabs.
Default is to look for .svg image.
"""
if url_pattern is None:
# If no URL pattern is provided, defaults to capture the first
# tab that shows SVG image.
url_pattern = '.svg'
tabs = self._resource.get_tabs()
for i in xrange(0, len(tabs)):
if url_pattern in tabs[i].url:
data = tabs[i].Screenshot(timeout=5)
# Flip the colors from BGR to RGB.
data = numpy.fliplr(data.reshape(-1, 3)).reshape(data.shape)
data.tofile(output_path)
break
return True
def toggle_mirrored(self):
"""Toggles mirrored."""
graphics_utils.screen_toggle_mirrored()
return True
def hide_cursor(self):
"""Hides mouse cursor."""
graphics_utils.hide_cursor()
return True
def is_mirrored_enabled(self):
"""Checks the mirrored state.
@return True if mirrored mode is enabled.
"""
return bool(self.get_display_info()[0]['mirroringSourceId'])
def set_mirrored(self, is_mirrored):
"""Sets mirrored mode.
@param is_mirrored: True or False to indicate mirrored state.
@return True if success, False otherwise.
"""
# TODO: Do some experiments to minimize waiting time after toggling.
retries = 3
while self.is_mirrored_enabled() != is_mirrored and retries > 0:
self.toggle_mirrored()
time.sleep(3)
retries -= 1
return self.is_mirrored_enabled() == is_mirrored
def is_display_primary(self, internal=True):
"""Checks if internal screen is primary display.
@param internal: is internal/external screen primary status requested
@return boolean True if internal display is primary.
"""
for info in self.get_display_info():
if info['isInternal'] == internal and info['isPrimary']:
return True
return False
def suspend_resume(self, suspend_time=10):
"""Suspends the DUT for a given time in second.
@param suspend_time: Suspend time in second.
"""
sys_power.do_suspend(suspend_time)
return True
def suspend_resume_bg(self, suspend_time=10):
"""Suspends the DUT for a given time in second in the background.
@param suspend_time: Suspend time in second.
"""
process = multiprocessing.Process(target=self.suspend_resume,
args=(suspend_time,))
process.start()
return True
@_retry_display_call
def get_external_connector_name(self):
"""Gets the name of the external output connector.
@return The external output connector name as a string, if any.
Otherwise, return False.
"""
return graphics_utils.get_external_connector_name()
def get_internal_connector_name(self):
"""Gets the name of the internal output connector.
@return The internal output connector name as a string, if any.
Otherwise, return False.
"""
return graphics_utils.get_internal_connector_name()
def wait_external_display_connected(self, display):
"""Waits for the specified external display to be connected.
@param display: The display name as a string, like 'HDMI1', or
False if no external display is expected.
@return: True if display is connected; False otherwise.
"""
result = utils.wait_for_value(self.get_external_connector_name,
expected_value=display)
return result == display
@facade_resource.retry_chrome_call
def move_to_display(self, display_index):
"""Moves the current window to the indicated display.
@param display_index: The index of the indicated display.
@return True if success.
@raise TimeoutException if it fails.
"""
display_info = self.get_display_info()
if (display_index is False or
display_index not in xrange(0, len(display_info)) or
not display_info[display_index]['isEnabled']):
raise RuntimeError('Cannot find the indicated display')
target_bounds = display_info[display_index]['bounds']
extension = self._resource.get_extension()
# If the area of bounds is empty (here we achieve this by setting
# width and height to zero), the window_sizer will automatically
# determine an area which is visible and fits on the screen.
# For more details, see chrome/browser/ui/window_sizer.cc
# Without setting state to 'normal', if the current state is
# 'minimized', 'maximized' or 'fullscreen', the setting of
# 'left', 'top', 'width' and 'height' will be ignored.
# For more details, see chrome/browser/extensions/api/tabs/tabs_api.cc
extension.ExecuteJavaScript(
"""
var __status = 'Running';
chrome.windows.update(
chrome.windows.WINDOW_ID_CURRENT,
{left: %d, top: %d, width: 0, height: 0,
state: 'normal'},
function(info) {
if (info.left == %d && info.top == %d &&
info.state == 'normal')
__status = 'Done'; });
"""
% (target_bounds['left'], target_bounds['top'],
target_bounds['left'], target_bounds['top'])
)
extension.WaitForJavaScriptExpression(
"__status == 'Done'",
web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT)
return True
def is_fullscreen_enabled(self):
"""Checks the fullscreen state.
@return True if fullscreen mode is enabled.
"""
return self.get_window_info()['state'] == 'fullscreen'
def set_fullscreen(self, is_fullscreen):
"""Sets the current window to full screen.
@param is_fullscreen: True or False to indicate fullscreen state.
@return True if success, False otherwise.
"""
extension = self._resource.get_extension()
if not extension:
raise RuntimeError('Autotest extension not found')
if is_fullscreen:
window_state = "fullscreen"
else:
window_state = "normal"
extension.ExecuteJavaScript(
"""
var __status = 'Running';
chrome.windows.update(
chrome.windows.WINDOW_ID_CURRENT,
{state: '%s'},
function() { __status = 'Done'; });
"""
% window_state)
utils.wait_for_value(lambda: (
extension.EvaluateJavaScript('__status') == 'Done'),
expected_value=True)
return self.is_fullscreen_enabled() == is_fullscreen
def load_url(self, url):
"""Loads the given url in a new tab. The new tab will be active.
@param url: The url to load as a string.
@return a str, the tab descriptor of the opened tab.
"""
return self._resource.load_url(url)
def load_calibration_image(self, resolution):
"""Opens a new tab and loads a full screen calibration
image from the HTTP server.
@param resolution: A tuple (width, height) of resolution.
@return a str, the tab descriptor of the opened tab.
"""
path = self.CALIBRATION_IMAGE_PATH
self._image_generator.generate_image(resolution[0], resolution[1], path)
os.chmod(path, 0644)
tab_descriptor = self.load_url('file://%s' % path)
return tab_descriptor
def load_color_sequence(self, tab_descriptor, color_sequence):
"""Displays a series of colors on full screen on the tab.
tab_descriptor is returned by any open tab API of display facade.
e.g.,
tab_descriptor = load_url('about:blank')
load_color_sequence(tab_descriptor, color)
@param tab_descriptor: Indicate which tab to test.
@param color_sequence: An integer list for switching colors.
@return A list of the timestamp for each switch.
"""
tab = self._resource.get_tab_by_descriptor(tab_descriptor)
color_sequence_for_java_script = (
'var color_sequence = [' +
','.join("'#%06X'" % x for x in color_sequence) +
'];')
# Paints are synchronized to the fresh rate of the screen by
# window.requestAnimationFrame.
tab.ExecuteJavaScript(color_sequence_for_java_script + """
function render(timestamp) {
window.timestamp_list.push(timestamp);
if (window.count < color_sequence.length) {
document.body.style.backgroundColor =
color_sequence[count];
window.count++;
window.requestAnimationFrame(render);
}
}
window.count = 0;
window.timestamp_list = [];
window.requestAnimationFrame(render);
""")
# Waiting time is decided by following concerns:
# 1. MINIMUM_REFRESH_RATE_EXPECTED: the minimum refresh rate
# we expect it to be. Real refresh rate is related to
# not only hardware devices but also drivers and browsers.
# Most graphics devices support at least 60fps for a single
# monitor, and under mirror mode, since the both frames
# buffers need to be updated for an input frame, the refresh
# rate will decrease by half, so here we set it to be a
# little less than 30 (= 60/2) to make it more tolerant.
# 2. DELAY_TIME: extra wait time for timeout.
tab.WaitForJavaScriptExpression(
'window.count == color_sequence.length',
(len(color_sequence) / self.MINIMUM_REFRESH_RATE_EXPECTED)
+ self.DELAY_TIME)
return tab.EvaluateJavaScript("window.timestamp_list")
def close_tab(self, tab_descriptor):
"""Disables fullscreen and closes the tab of the given tab descriptor.
tab_descriptor is returned by any open tab API of display facade.
e.g.,
1.
tab_descriptor = load_url(url)
close_tab(tab_descriptor)
2.
tab_descriptor = load_calibration_image(resolution)
close_tab(tab_descriptor)
@param tab_descriptor: Indicate which tab to be closed.
"""
# set_fullscreen(False) is necessary here because currently there
# is a bug in tabs.Close(). If the current state is fullscreen and
# we call close_tab() without setting state back to normal, it will
# cancel fullscreen mode without changing system configuration, and
# so that the next time someone calls set_fullscreen(True), the
# function will find that current state is already 'fullscreen'
# (though it is not) and do nothing, which will break all the
# following tests.
self.set_fullscreen(False)
self._resource.close_tab(tab_descriptor)
return True