普通文本  |  644行  |  21.41 KB

# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import re
import shutil
import subprocess
import tempfile
import time
import urllib
import urllib2

from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.cros.input_playback import input_playback


class touch_playback_test_base(test.test):
    """Base class for touch tests involving playback."""
    version = 1

    _INPUTCONTROL = '/opt/google/input/inputcontrol'


    @property
    def _has_touchpad(self):
        """True if device under test has a touchpad; else False."""
        return self.player.has('touchpad')


    @property
    def _has_touchscreen(self):
        """True if device under test has a touchscreen; else False."""
        return self.player.has('touchscreen')


    @property
    def _has_mouse(self):
        """True if device under test has or emulates a USB mouse; else False."""
        return self.player.has('mouse')


    def warmup(self, mouse_props=None):
        """Test setup.

        Instantiate player object to find touch devices, if any.
        These devices can be used for playback later.
        Emulate a USB mouse if a property file is provided.
        Check if the inputcontrol script is avaiable on the disk.

        @param mouse_props: optional property file for a mouse to emulate.
                            Created using 'evemu-describe /dev/input/X'.

        """
        self.player = input_playback.InputPlayback()
        if mouse_props:
            self.player.emulate(input_type='mouse', property_file=mouse_props)
        self.player.find_connected_inputs()

        self._autotest_ext = None
        self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL)
        self._platform = utils.get_board()
        if 'cheets' in self._platform:
            self._platform = self._platform[:-len('-cheets')]


    def _find_test_files(self, input_type, gestures):
        """Determine where the playback gesture files for this test are.

        Expected file format is: <boardname>_<input type>_<hwid>_<gesture name>
            e.g. samus_touchpad_164.17_scroll_down

        @param input_type: device type, e.g. 'touchpad'
        @param gestures: list of gesture name strings used in filename

        @returns: None if not all files are found.  Dictionary of filepaths if
                  they are found, indexed by gesture names as given.
        @raises: error.TestError if no device is found or if device should have
                 a hw_id but does not.

        """
        if not self.player.has(input_type):
            raise error.TestError('Device does not have a %s!' % input_type)

        if input_type in ['touchpad', 'touchscreen', 'stylus']:
            hw_id = self.player.devices[input_type].hw_id
            if not hw_id:
                raise error.TestError('No valid hw_id for %s!' % input_type)
            filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id)

        else:
            device_name = self.player.devices[input_type].name
            filename_fmt = '%s_%s' % (device_name, input_type)

        filepaths = {}
        for gesture in gestures:
            filename = '%s_%s' % (filename_fmt, gesture)
            filepath = self._download_remote_test_file(filename, input_type)
            if not filepath:
                logging.info('Did not find files for this device!')
                return None

            filepaths[gesture] = filepath

        return filepaths


    def _find_test_files_from_directions(self, input_type, fmt_str, directions):
        """Find gesture files given a list of directions and name format.

        @param input_type: device type, e.g. 'touchpad'
        @param fmt_str: format string for filename, e.g. 'scroll-%s'
        @param directions: list of directions for fmt_string

        @returns: None if not all files are found.  Dictionary of filepaths if
                  they are found, indexed by directions as given.
        @raises: error.TestError if no hw_id is found.

        """
        gestures = [fmt_str % d for d in directions]
        temp_filepaths = self._find_test_files(input_type, gestures)

        filepaths = {}
        if temp_filepaths:
            filepaths = {d: temp_filepaths[fmt_str % d] for d in directions}

        return filepaths


    def _download_remote_test_file(self, filename, input_type):
        """Download a file from the remote touch playback folder.

        @param filename: string of filename
        @param input_type: device type, e.g. 'touchpad'

        @returns: Path to local file or None if file is not found.

        """
        REMOTE_STORAGE_URL = ('https://storage.googleapis.com/'
                              'chromiumos-test-assets-public/touch_playback')
        filename = urllib.quote(filename)

        if input_type in ['touchpad', 'touchscreen', 'stylus']:
            url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename)
        else:
            url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename)
        local_file = os.path.join(self.bindir, filename)

        logging.info('Looking for %s', url)
        try:
            file_utils.download_file(url, local_file)
        except urllib2.URLError as e:
            logging.info('File download failed!')
            logging.debug(e.msg)
            return None

        return local_file


    def _emulate_mouse(self, property_file=None):
        """Emulate a mouse with the given property file.

        player will use default mouse if no file is provided.

        """
        self.player.emulate(input_type='mouse', property_file=property_file)
        self.player.find_connected_inputs()
        if not self._has_mouse:
            raise error.TestError('Mouse emulation failed!')


    def _playback(self, filepath, touch_type='touchpad'):
        """Playback a given input file on the given input."""
        self.player.playback(filepath, touch_type)


    def _blocking_playback(self, filepath, touch_type='touchpad'):
        """Playback a given input file on the given input; block until done."""
        self.player.blocking_playback(filepath, touch_type)


    def _set_touch_setting_by_inputcontrol(self, setting, value):
        """Set a given touch setting the given value by inputcontrol.

        @param setting: Name of touch setting, e.g. 'tapclick'.
        @param value: True for enabled, False for disabled.

        """
        cmd_value = 1 if value else 0
        utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value))
        logging.info('%s turned %s.', setting, 'on' if value else 'off')


    def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting,
                           value):
        """Set a given touch setting the given value.

        @param inputcontrol_setting: Name of touch setting for the inputcontrol
                                     script, e.g. 'tapclick'.
        @param autotest_ext_setting: Name of touch setting for the autotest
                                     extension, e.g. 'TapToClick'.
        @param value: True for enabled, False for disabled.

        """
        if self._has_inputcontrol:
            self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value)
        elif self._autotest_ext is not None:
            self._autotest_ext.EvaluateJavaScript(
                    'chrome.autotestPrivate.set%s(%s);'
                    % (autotest_ext_setting, ("%s" % value).lower()))
            # TODO: remove this sleep once checking for value is available.
            time.sleep(1)
        else:
            raise error.TestFail('Both inputcontrol and the autotest '
                                 'extension are not availble.')


    def _set_australian_scrolling(self, value):
        """Set australian scrolling to the given value.

        @param value: True for enabled, False for disabled.

        """
        self._set_touch_setting('australian_scrolling', 'NaturalScroll', value)


    def _set_tap_to_click(self, value):
        """Set tap-to-click to the given value.

        @param value: True for enabled, False for disabled.

        """
        self._set_touch_setting('tapclick', 'TapToClick', value)


    def _set_tap_dragging(self, value):
        """Set tap dragging to the given value.

        @param value: True for enabled, False for disabled.

        """
        self._set_touch_setting('tapdrag', 'TapDragging', value)


    def _set_autotest_ext(self, ext):
        """Set the autotest extension.

        @ext: the autotest extension object.

        """
        self._autotest_ext = ext


    def _open_test_page(self, cr, filename='test_page.html'):
        """Prepare test page for testing.  Set self._tab with page.

        @param cr: chrome.Chrome() object
        @param filename: name of file in self.bindir to open

        """
        self._test_page = TestPage(cr, self.bindir, filename)
        self._tab = self._test_page._tab


    def _open_events_page(self, cr):
        """Open the test events page.  Set self._events with EventsPage class.

        Also set self._tab as this page and self.bindir as the http server dir.

        @param cr: chrome.Chrome() object

        """
        self._events = EventsPage(cr, self.bindir)
        self._tab = self._events._tab


    def _center_cursor(self):
        """Playback mouse movement to center cursor.

        Requres that self._emulate_mouse() has been called.

        """
        self.player.blocking_playback_of_default_file(
                'mouse_center_cursor_gesture', input_type='mouse')


    def _get_kernel_events_recorder(self, input_type):
        """Return a kernel event recording object for the given input type.

        @param input_type: device type, e.g. 'touchpad'

        @returns: KernelEventsRecorder instance.

        """
        node = self.player.devices[input_type].node
        return KernelEventsRecorder(node)


    def cleanup(self):
        self.player.close()


class KernelEventsRecorder(object):
    """Object to record kernel events for a particular device."""

    def __init__(self, node):
        """Setup to record future evtest output for this node.

        @param input_type: the device which to inspect, e.g. 'mouse'

        """
        self.node = node
        self.fh = tempfile.NamedTemporaryFile()
        self.evtest_process = None


    def start(self):
        """Start recording events."""
        self.evtest_process = subprocess.Popen(
                ['evtest', self.node], stdout=self.fh)

        # Wait until the initial output has finished before returning.
        def find_exit():
            """Polling function for end of output."""
            interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' %
                             self.fh.name)
            line_count = utils.run(interrupt_cmd).stdout.strip()
            return line_count != '0'
        utils.poll_for_condition(find_exit)


    def clear(self):
        """Clear previous events."""
        self.stop()
        self.fh.close()
        self.fh = tempfile.NamedTemporaryFile()


    def stop(self):
        """Stop recording events."""
        if self.evtest_process:
            self.evtest_process.kill()
            self.evtest_process = None


    def get_recorded_events(self):
        """Get the evtest output since object was created."""
        self.fh.seek(0)
        events = self.fh.read()
        return events


    def log_recorded_events(self):
        """Save recorded events into logs."""
        events = self.get_recorded_events()
        logging.info('Kernel events seen:\n%s', events)


    def get_last_event_timestamp(self, filter_str=''):
        """Return the timestamp of the last event since recording started.

        Events are in the form "Event: time <epoch time>, <info>\n"

        @param filter_str: a regex string to match to the <info> section.

        @returns: floats matching

        """
        events = self.get_recorded_events()
        findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
                             events, re.MULTILINE)
        re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE)
        if not findall:
            self.log_recorded_events()
            raise error.TestError('Could not find any kernel timestamps!'
                                  '  Filter: %s' % filter_str)
        return float(findall[-1])


    def close(self):
        """Clean up this class."""
        self.stop()
        self.fh.close()


class TestPage(object):
    """Wrapper around a Telemtry tab for utility functions.

    Provides functions such as reload and setting scroll height on page.

    """
    _DEFAULT_SCROLL = 5000

    def __init__(self, cr, httpdir, filename):
        """Open a given test page in the given httpdir.

        @param cr: chrome.Chrome() object
        @param httpdir: the directory to use for SetHTTPServerDirectories
        @param filename: path to the file to open, relative to httpdir

        """
        cr.browser.platform.SetHTTPServerDirectories(httpdir)
        self._tab = cr.browser.tabs[0]
        self._tab.Navigate(cr.browser.platform.http_server.UrlOf(
                os.path.join(httpdir, filename)))
        self.wait_for_page_ready()


    def reload_page(self):
        """Reloads test page."""
        self._tab.Navigate(self._tab.url)
        self.wait_for_page_ready()


    def wait_for_page_ready(self):
        """Wait for a variable pageReady on the test page to be true.

        Presuposes that a pageReady variable exists on this page.

        @raises error.TestError if page is not ready after timeout.

        """
        self._tab.WaitForDocumentReadyStateToBeComplete()
        utils.poll_for_condition(
                lambda: self._tab.EvaluateJavaScript('pageReady'),
                exception=error.TestError('Test page is not ready!'))


    def expand_page(self):
        """Expand the page to be very large, to allow scrolling."""
        cmd = 'document.body.style.%s = %d+"px"' % (
                '%s', self._DEFAULT_SCROLL * 5)
        self._tab.ExecuteJavaScript(cmd % 'width')
        self._tab.ExecuteJavaScript(cmd % 'height')


    def set_scroll_position(self, value, scroll_vertical=True):
        """Set scroll position to given value.

        @param scroll_vertical: True for vertical scroll,
                                False for horizontal Scroll.
        @param value: True for enabled, False for disabled.

         """
        if scroll_vertical:
            self._tab.ExecuteJavaScript(
                'document.body.scrollTop=%s' % value)
        else:
            self._tab.ExecuteJavaScript(
                'document.body.scrollLeft=%s' % value)


    def set_default_scroll_position(self, scroll_vertical=True):
        """Set scroll position of page to default.

        @param scroll_vertical: True for vertical scroll,
                                False for horizontal Scroll.
        @raise: TestError if page is not set to default scroll position

        """
        total_tries = 2
        for i in xrange(total_tries):
            try:
                self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical)
                self.wait_for_default_scroll_position(scroll_vertical)
            except error.TestError as e:
                if i == total_tries - 1:
                   pos = self.get_scroll_position(scroll_vertical)
                   logging.error('SCROLL POSITION: %s', pos)
                   raise e
            else:
                 break


    def get_scroll_position(self, scroll_vertical=True):
        """Return current scroll position of page.

        @param scroll_vertical: True for vertical scroll,
                                False for horizontal Scroll.

        """
        if scroll_vertical:
            return int(self._tab.EvaluateJavaScript('document.body.scrollTop'))
        else:
            return int(self._tab.EvaluateJavaScript('document.body.scrollLeft'))


    def wait_for_default_scroll_position(self, scroll_vertical=True):
        """Wait for page to be at the default scroll position.

        @param scroll_vertical: True for vertical scroll,
                                False for horizontal scroll.

        @raise: TestError if page either does not move or does not stop moving.

        """
        utils.poll_for_condition(
                lambda: self.get_scroll_position(
                        scroll_vertical) == self._DEFAULT_SCROLL,
                exception=error.TestError('Page not set to default scroll!'))


    def wait_for_scroll_position_to_settle(self, scroll_vertical=True):
        """Wait for page to move and then stop moving.

        @param scroll_vertical: True for Vertical scroll and
                                False for horizontal scroll.

        @raise: TestError if page either does not move or does not stop moving.

        """
        # Wait until page starts moving.
        utils.poll_for_condition(
                lambda: self.get_scroll_position(
                        scroll_vertical) != self._DEFAULT_SCROLL,
                exception=error.TestError('No scrolling occurred!'), timeout=30)

        # Wait until page has stopped moving.
        self._previous = self._DEFAULT_SCROLL
        def _movement_stopped():
            current = self.get_scroll_position()
            result = current == self._previous
            self._previous = current
            return result

        utils.poll_for_condition(
                lambda: _movement_stopped(), sleep_interval=1,
                exception=error.TestError('Page did not stop moving!'),
                timeout=30)


    def get_page_width(self):
        """Return window.innerWidth for this page."""
        return int(self._tab.EvaluateJavaScript('window.innerWidth'))


class EventsPage(TestPage):
    """Functions to monitor input events on the DUT, as seen by a webpage.

    A subclass of TestPage which uses and interacts with a specific page.

    """
    def __init__(self, cr, httpdir):
        """Open the website and save the tab in self._tab.

        @param cr: chrome.Chrome() object
        @param httpdir: the directory to use for SetHTTPServerDirectories

        """
        filename = 'touch_events_test_page.html'
        current_dir = os.path.dirname(os.path.realpath(__file__))
        shutil.copyfile(os.path.join(current_dir, filename),
                        os.path.join(httpdir, filename))

        super(EventsPage, self).__init__(cr, httpdir, filename)


    def clear_previous_events(self):
        """Wipe the test page back to its original state."""
        self._tab.ExecuteJavaScript('pageReady = false')
        self._tab.ExecuteJavaScript('clearPreviousEvents()')
        self.wait_for_page_ready()


    def get_events_log(self):
        """Return the event log from the test page."""
        return self._tab.EvaluateJavaScript('eventLog')


    def log_events(self):
        """Put the test page's event log into logging.info."""
        logging.info('EVENTS LOG:')
        logging.info(self.get_events_log())


    def get_time_of_last_event(self):
        """Return the timestamp of the last seen event (if any)."""
        return self._tab.EvaluateJavaScript('timeOfLastEvent')


    def get_event_count(self):
        """Return the number of events that the test page has seen."""
        return self._tab.EvaluateJavaScript('eventCount')


    def get_scroll_delta(self, is_vertical):
        """Return the net scrolling the test page has seen.

        @param is_vertical: True for vertical scrolling; False for horizontal.

        """
        axis = 'y' if is_vertical else 'x'
        return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis)


    def get_click_count(self):
        """Return the number of clicks the test page has seen."""
        return self._tab.EvaluateJavaScript('clickCount')


    def wait_for_events_to_complete(self, delay_secs=1, timeout=60):
        """Wait until test page stops seeing events for delay_secs seconds.

        @param delay_secs: the polling frequency in seconds.
        @param timeout: the number of seconds to wait for events to complete.
        @raises: error.TestError if no events occurred.
        @raises: error.TestError if events did not stop after timeout seconds.

        """
        self._tmp_previous_event_count = -1
        def _events_stopped_coming():
            most_recent_event_count = self.get_event_count()
            delta = most_recent_event_count - self._tmp_previous_event_count
            self._tmp_previous_event_count = most_recent_event_count
            return most_recent_event_count != 0 and delta == 0

        try:
            utils.poll_for_condition(
                    _events_stopped_coming, exception=error.TestError(),
                    sleep_interval=delay_secs, timeout=timeout)
        except error.TestError:
            if self._tmp_previous_event_count == 0:
                raise error.TestError('No touch event was seen!')
            else:
                self.log_events()
                raise error.TestError('Touch events did not stop!')


    def set_prevent_defaults(self, value):
        """Set whether to allow default event actions to go through.

        E.g. if this is True, a two finger horizontal scroll will not actually
        produce history navigation on the browser.

        @param value: True for prevent defaults; False to allow them.

        """
        js_value = str(value).lower()
        self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value)