# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import re
import shutil
import subprocess
import tempfile
import time
import urllib
import urllib2
from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.cros.input_playback import input_playback
class touch_playback_test_base(test.test):
"""Base class for touch tests involving playback."""
version = 1
_INPUTCONTROL = '/opt/google/input/inputcontrol'
@property
def _has_touchpad(self):
"""True if device under test has a touchpad; else False."""
return self.player.has('touchpad')
@property
def _has_touchscreen(self):
"""True if device under test has a touchscreen; else False."""
return self.player.has('touchscreen')
@property
def _has_mouse(self):
"""True if device under test has or emulates a USB mouse; else False."""
return self.player.has('mouse')
def warmup(self, mouse_props=None):
"""Test setup.
Instantiate player object to find touch devices, if any.
These devices can be used for playback later.
Emulate a USB mouse if a property file is provided.
Check if the inputcontrol script is avaiable on the disk.
@param mouse_props: optional property file for a mouse to emulate.
Created using 'evemu-describe /dev/input/X'.
"""
self.player = input_playback.InputPlayback()
if mouse_props:
self.player.emulate(input_type='mouse', property_file=mouse_props)
self.player.find_connected_inputs()
self._autotest_ext = None
self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL)
self._platform = utils.get_board()
if 'cheets' in self._platform:
self._platform = self._platform[:-len('-cheets')]
def _find_test_files(self, input_type, gestures):
"""Determine where the playback gesture files for this test are.
Expected file format is: <boardname>_<input type>_<hwid>_<gesture name>
e.g. samus_touchpad_164.17_scroll_down
@param input_type: device type, e.g. 'touchpad'
@param gestures: list of gesture name strings used in filename
@returns: None if not all files are found. Dictionary of filepaths if
they are found, indexed by gesture names as given.
@raises: error.TestError if no device is found or if device should have
a hw_id but does not.
"""
if not self.player.has(input_type):
raise error.TestError('Device does not have a %s!' % input_type)
if input_type in ['touchpad', 'touchscreen', 'stylus']:
hw_id = self.player.devices[input_type].hw_id
if not hw_id:
raise error.TestError('No valid hw_id for %s!' % input_type)
filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id)
else:
device_name = self.player.devices[input_type].name
filename_fmt = '%s_%s' % (device_name, input_type)
filepaths = {}
for gesture in gestures:
filename = '%s_%s' % (filename_fmt, gesture)
filepath = self._download_remote_test_file(filename, input_type)
if not filepath:
logging.info('Did not find files for this device!')
return None
filepaths[gesture] = filepath
return filepaths
def _find_test_files_from_directions(self, input_type, fmt_str, directions):
"""Find gesture files given a list of directions and name format.
@param input_type: device type, e.g. 'touchpad'
@param fmt_str: format string for filename, e.g. 'scroll-%s'
@param directions: list of directions for fmt_string
@returns: None if not all files are found. Dictionary of filepaths if
they are found, indexed by directions as given.
@raises: error.TestError if no hw_id is found.
"""
gestures = [fmt_str % d for d in directions]
temp_filepaths = self._find_test_files(input_type, gestures)
filepaths = {}
if temp_filepaths:
filepaths = {d: temp_filepaths[fmt_str % d] for d in directions}
return filepaths
def _download_remote_test_file(self, filename, input_type):
"""Download a file from the remote touch playback folder.
@param filename: string of filename
@param input_type: device type, e.g. 'touchpad'
@returns: Path to local file or None if file is not found.
"""
REMOTE_STORAGE_URL = ('https://storage.googleapis.com/'
'chromiumos-test-assets-public/touch_playback')
filename = urllib.quote(filename)
if input_type in ['touchpad', 'touchscreen', 'stylus']:
url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename)
else:
url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename)
local_file = os.path.join(self.bindir, filename)
logging.info('Looking for %s', url)
try:
file_utils.download_file(url, local_file)
except urllib2.URLError as e:
logging.info('File download failed!')
logging.debug(e.msg)
return None
return local_file
def _emulate_mouse(self, property_file=None):
"""Emulate a mouse with the given property file.
player will use default mouse if no file is provided.
"""
self.player.emulate(input_type='mouse', property_file=property_file)
self.player.find_connected_inputs()
if not self._has_mouse:
raise error.TestError('Mouse emulation failed!')
def _playback(self, filepath, touch_type='touchpad'):
"""Playback a given input file on the given input."""
self.player.playback(filepath, touch_type)
def _blocking_playback(self, filepath, touch_type='touchpad'):
"""Playback a given input file on the given input; block until done."""
self.player.blocking_playback(filepath, touch_type)
def _set_touch_setting_by_inputcontrol(self, setting, value):
"""Set a given touch setting the given value by inputcontrol.
@param setting: Name of touch setting, e.g. 'tapclick'.
@param value: True for enabled, False for disabled.
"""
cmd_value = 1 if value else 0
utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value))
logging.info('%s turned %s.', setting, 'on' if value else 'off')
def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting,
value):
"""Set a given touch setting the given value.
@param inputcontrol_setting: Name of touch setting for the inputcontrol
script, e.g. 'tapclick'.
@param autotest_ext_setting: Name of touch setting for the autotest
extension, e.g. 'TapToClick'.
@param value: True for enabled, False for disabled.
"""
if self._has_inputcontrol:
self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value)
elif self._autotest_ext is not None:
self._autotest_ext.EvaluateJavaScript(
'chrome.autotestPrivate.set%s(%s);'
% (autotest_ext_setting, ("%s" % value).lower()))
# TODO: remove this sleep once checking for value is available.
time.sleep(1)
else:
raise error.TestFail('Both inputcontrol and the autotest '
'extension are not availble.')
def _set_australian_scrolling(self, value):
"""Set australian scrolling to the given value.
@param value: True for enabled, False for disabled.
"""
self._set_touch_setting('australian_scrolling', 'NaturalScroll', value)
def _set_tap_to_click(self, value):
"""Set tap-to-click to the given value.
@param value: True for enabled, False for disabled.
"""
self._set_touch_setting('tapclick', 'TapToClick', value)
def _set_tap_dragging(self, value):
"""Set tap dragging to the given value.
@param value: True for enabled, False for disabled.
"""
self._set_touch_setting('tapdrag', 'TapDragging', value)
def _set_autotest_ext(self, ext):
"""Set the autotest extension.
@ext: the autotest extension object.
"""
self._autotest_ext = ext
def _open_test_page(self, cr, filename='test_page.html'):
"""Prepare test page for testing. Set self._tab with page.
@param cr: chrome.Chrome() object
@param filename: name of file in self.bindir to open
"""
self._test_page = TestPage(cr, self.bindir, filename)
self._tab = self._test_page._tab
def _open_events_page(self, cr):
"""Open the test events page. Set self._events with EventsPage class.
Also set self._tab as this page and self.bindir as the http server dir.
@param cr: chrome.Chrome() object
"""
self._events = EventsPage(cr, self.bindir)
self._tab = self._events._tab
def _center_cursor(self):
"""Playback mouse movement to center cursor.
Requres that self._emulate_mouse() has been called.
"""
self.player.blocking_playback_of_default_file(
'mouse_center_cursor_gesture', input_type='mouse')
def _get_kernel_events_recorder(self, input_type):
"""Return a kernel event recording object for the given input type.
@param input_type: device type, e.g. 'touchpad'
@returns: KernelEventsRecorder instance.
"""
node = self.player.devices[input_type].node
return KernelEventsRecorder(node)
def cleanup(self):
self.player.close()
class KernelEventsRecorder(object):
"""Object to record kernel events for a particular device."""
def __init__(self, node):
"""Setup to record future evtest output for this node.
@param input_type: the device which to inspect, e.g. 'mouse'
"""
self.node = node
self.fh = tempfile.NamedTemporaryFile()
self.evtest_process = None
def start(self):
"""Start recording events."""
self.evtest_process = subprocess.Popen(
['evtest', self.node], stdout=self.fh)
# Wait until the initial output has finished before returning.
def find_exit():
"""Polling function for end of output."""
interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' %
self.fh.name)
line_count = utils.run(interrupt_cmd).stdout.strip()
return line_count != '0'
utils.poll_for_condition(find_exit)
def clear(self):
"""Clear previous events."""
self.stop()
self.fh.close()
self.fh = tempfile.NamedTemporaryFile()
def stop(self):
"""Stop recording events."""
if self.evtest_process:
self.evtest_process.kill()
self.evtest_process = None
def get_recorded_events(self):
"""Get the evtest output since object was created."""
self.fh.seek(0)
events = self.fh.read()
return events
def log_recorded_events(self):
"""Save recorded events into logs."""
events = self.get_recorded_events()
logging.info('Kernel events seen:\n%s', events)
def get_last_event_timestamp(self, filter_str=''):
"""Return the timestamp of the last event since recording started.
Events are in the form "Event: time <epoch time>, <info>\n"
@param filter_str: a regex string to match to the <info> section.
@returns: floats matching
"""
events = self.get_recorded_events()
findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
events, re.MULTILINE)
re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE)
if not findall:
self.log_recorded_events()
raise error.TestError('Could not find any kernel timestamps!'
' Filter: %s' % filter_str)
return float(findall[-1])
def close(self):
"""Clean up this class."""
self.stop()
self.fh.close()
class TestPage(object):
"""Wrapper around a Telemtry tab for utility functions.
Provides functions such as reload and setting scroll height on page.
"""
_DEFAULT_SCROLL = 5000
def __init__(self, cr, httpdir, filename):
"""Open a given test page in the given httpdir.
@param cr: chrome.Chrome() object
@param httpdir: the directory to use for SetHTTPServerDirectories
@param filename: path to the file to open, relative to httpdir
"""
cr.browser.platform.SetHTTPServerDirectories(httpdir)
self._tab = cr.browser.tabs[0]
self._tab.Navigate(cr.browser.platform.http_server.UrlOf(
os.path.join(httpdir, filename)))
self.wait_for_page_ready()
def reload_page(self):
"""Reloads test page."""
self._tab.Navigate(self._tab.url)
self.wait_for_page_ready()
def wait_for_page_ready(self):
"""Wait for a variable pageReady on the test page to be true.
Presuposes that a pageReady variable exists on this page.
@raises error.TestError if page is not ready after timeout.
"""
self._tab.WaitForDocumentReadyStateToBeComplete()
utils.poll_for_condition(
lambda: self._tab.EvaluateJavaScript('pageReady'),
exception=error.TestError('Test page is not ready!'))
def expand_page(self):
"""Expand the page to be very large, to allow scrolling."""
cmd = 'document.body.style.%s = %d+"px"' % (
'%s', self._DEFAULT_SCROLL * 5)
self._tab.ExecuteJavaScript(cmd % 'width')
self._tab.ExecuteJavaScript(cmd % 'height')
def set_scroll_position(self, value, scroll_vertical=True):
"""Set scroll position to given value.
@param scroll_vertical: True for vertical scroll,
False for horizontal Scroll.
@param value: True for enabled, False for disabled.
"""
if scroll_vertical:
self._tab.ExecuteJavaScript(
'document.body.scrollTop=%s' % value)
else:
self._tab.ExecuteJavaScript(
'document.body.scrollLeft=%s' % value)
def set_default_scroll_position(self, scroll_vertical=True):
"""Set scroll position of page to default.
@param scroll_vertical: True for vertical scroll,
False for horizontal Scroll.
@raise: TestError if page is not set to default scroll position
"""
total_tries = 2
for i in xrange(total_tries):
try:
self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical)
self.wait_for_default_scroll_position(scroll_vertical)
except error.TestError as e:
if i == total_tries - 1:
pos = self.get_scroll_position(scroll_vertical)
logging.error('SCROLL POSITION: %s', pos)
raise e
else:
break
def get_scroll_position(self, scroll_vertical=True):
"""Return current scroll position of page.
@param scroll_vertical: True for vertical scroll,
False for horizontal Scroll.
"""
if scroll_vertical:
return int(self._tab.EvaluateJavaScript('document.body.scrollTop'))
else:
return int(self._tab.EvaluateJavaScript('document.body.scrollLeft'))
def wait_for_default_scroll_position(self, scroll_vertical=True):
"""Wait for page to be at the default scroll position.
@param scroll_vertical: True for vertical scroll,
False for horizontal scroll.
@raise: TestError if page either does not move or does not stop moving.
"""
utils.poll_for_condition(
lambda: self.get_scroll_position(
scroll_vertical) == self._DEFAULT_SCROLL,
exception=error.TestError('Page not set to default scroll!'))
def wait_for_scroll_position_to_settle(self, scroll_vertical=True):
"""Wait for page to move and then stop moving.
@param scroll_vertical: True for Vertical scroll and
False for horizontal scroll.
@raise: TestError if page either does not move or does not stop moving.
"""
# Wait until page starts moving.
utils.poll_for_condition(
lambda: self.get_scroll_position(
scroll_vertical) != self._DEFAULT_SCROLL,
exception=error.TestError('No scrolling occurred!'), timeout=30)
# Wait until page has stopped moving.
self._previous = self._DEFAULT_SCROLL
def _movement_stopped():
current = self.get_scroll_position()
result = current == self._previous
self._previous = current
return result
utils.poll_for_condition(
lambda: _movement_stopped(), sleep_interval=1,
exception=error.TestError('Page did not stop moving!'),
timeout=30)
def get_page_width(self):
"""Return window.innerWidth for this page."""
return int(self._tab.EvaluateJavaScript('window.innerWidth'))
class EventsPage(TestPage):
"""Functions to monitor input events on the DUT, as seen by a webpage.
A subclass of TestPage which uses and interacts with a specific page.
"""
def __init__(self, cr, httpdir):
"""Open the website and save the tab in self._tab.
@param cr: chrome.Chrome() object
@param httpdir: the directory to use for SetHTTPServerDirectories
"""
filename = 'touch_events_test_page.html'
current_dir = os.path.dirname(os.path.realpath(__file__))
shutil.copyfile(os.path.join(current_dir, filename),
os.path.join(httpdir, filename))
super(EventsPage, self).__init__(cr, httpdir, filename)
def clear_previous_events(self):
"""Wipe the test page back to its original state."""
self._tab.ExecuteJavaScript('pageReady = false')
self._tab.ExecuteJavaScript('clearPreviousEvents()')
self.wait_for_page_ready()
def get_events_log(self):
"""Return the event log from the test page."""
return self._tab.EvaluateJavaScript('eventLog')
def log_events(self):
"""Put the test page's event log into logging.info."""
logging.info('EVENTS LOG:')
logging.info(self.get_events_log())
def get_time_of_last_event(self):
"""Return the timestamp of the last seen event (if any)."""
return self._tab.EvaluateJavaScript('timeOfLastEvent')
def get_event_count(self):
"""Return the number of events that the test page has seen."""
return self._tab.EvaluateJavaScript('eventCount')
def get_scroll_delta(self, is_vertical):
"""Return the net scrolling the test page has seen.
@param is_vertical: True for vertical scrolling; False for horizontal.
"""
axis = 'y' if is_vertical else 'x'
return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis)
def get_click_count(self):
"""Return the number of clicks the test page has seen."""
return self._tab.EvaluateJavaScript('clickCount')
def wait_for_events_to_complete(self, delay_secs=1, timeout=60):
"""Wait until test page stops seeing events for delay_secs seconds.
@param delay_secs: the polling frequency in seconds.
@param timeout: the number of seconds to wait for events to complete.
@raises: error.TestError if no events occurred.
@raises: error.TestError if events did not stop after timeout seconds.
"""
self._tmp_previous_event_count = -1
def _events_stopped_coming():
most_recent_event_count = self.get_event_count()
delta = most_recent_event_count - self._tmp_previous_event_count
self._tmp_previous_event_count = most_recent_event_count
return most_recent_event_count != 0 and delta == 0
try:
utils.poll_for_condition(
_events_stopped_coming, exception=error.TestError(),
sleep_interval=delay_secs, timeout=timeout)
except error.TestError:
if self._tmp_previous_event_count == 0:
raise error.TestError('No touch event was seen!')
else:
self.log_events()
raise error.TestError('Touch events did not stop!')
def set_prevent_defaults(self, value):
"""Set whether to allow default event actions to go through.
E.g. if this is True, a two finger horizontal scroll will not actually
produce history navigation on the browser.
@param value: True for prevent defaults; False to allow them.
"""
js_value = str(value).lower()
self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value)