普通文本  |  237行  |  7.42 KB

#!/usr/bin/env python

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import cgi
import json
import logging
import logging.handlers
import os
import sys

import common
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib.cros import chrome, xmlrpc_server
from autotest_lib.client.cros import constants


class InteractiveXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate):
    """Exposes methods called remotely to create interactive tests.

    All instance methods of this object without a preceding '_' are exposed via
    an XML-RPC server. This is not a stateless handler object, which means that
    if you store state inside the delegate, that state will remain around for
    future calls.
    """

    def login(self):
        """Login to the system and open a tab.

        The tab opened is used by other methods on this server to interact
        with the user.

        @return True.

        """
        self._chrome = chrome.Chrome()
        self._chrome.browser.platform.SetHTTPServerDirectories(
                os.path.dirname(sys.argv[0]))
        self._tab = self._chrome.browser.tabs[0]
        self._tab.Navigate(
                self._chrome.browser.platform.http_server.UrlOf('shell.html'))

        return True


    def set_output(self, html):
        """Replace the contents of the tab.

        @param html: HTML document to replace tab contents with.

        @return True.

        """
        # JSON does a better job of escaping HTML for JavaScript than we could
        # with string.replace().
        html_escaped = json.dumps(html)
        # Use JavaScript to append the output and scroll to the bottom of the
        # open tab.
        self._tab.ExecuteJavaScript('document.body.innerHTML = %s; ' %
                                    html_escaped)
        self._tab.Activate()
        self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter()
        return True


    def append_output(self, html):
        """Append HTML to the contents of the tab.

        @param html: HTML to append to the existing tab contents.

        @return True.

        """
        # JSON does a better job of escaping HTML for JavaScript than we could
        # with string.replace().
        html_escaped = json.dumps(html)
        # Use JavaScript to append the output and scroll to the bottom of the
        # open tab.
        self._tab.ExecuteJavaScript(
                ('document.body.innerHTML += %s; ' % html_escaped) +
                'window.scrollTo(0, document.body.scrollHeight);')
        self._tab.Activate()
        self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter()
        return True


    def append_buttons(self, *args):
        """Append confirmation buttons to the tab.

        Each button is given an index, 0 for the first button, 1 for the second,
        and so on.

        @param title...: Title of button to append.

        @return True.

        """
        html = ''
        index = 0
        for title in args:
            onclick = 'submit_button(%d)' % index
            html += ('<input type="button" value="%s" onclick="%s">' % (
                     cgi.escape(title),
                     cgi.escape(onclick)))
            index += 1
        return self.append_output(html)


    def wait_for_button(self, timeout):
        """Wait for a button to be clicked.

        Call append_buttons() before this to add buttons to the document.

        @param timeout: Maximum time, in seconds, to wait for a click.

        @return index of button that was clicked.

        """
        # Wait for the button to be clicked.
        utils.poll_for_condition(
                condition=lambda:
                    self._tab.EvaluateJavaScript('window.__ready') == 1,
                desc='User clicked on button.',
                timeout=timeout)
        # Fetch the result.
        result = self._tab.EvaluateJavaScript('window.__result')
        # Reset for the next button.
        self._tab.ExecuteJavaScript(
                'window.__ready = 0; '
                'window.__result = null;')
        return result


    def check_for_button(self):
        """Check whether a button has been clicked.

        Call append_buttons() before this to add buttons to the document.

        @return index of button that was clicked or -1 if no button
            has been clicked.

        """
        if not self._tab.EvaluateJavaScript('window.__ready'):
            return -1
        # Fetch the result.
        result = self._tab.EvaluateJavaScript('window.__result')
        # Reset for the next button.
        self._tab.ExecuteJavaScript(
                'window.__ready = 0; '
                'window.__result = null;')
        return result


    def append_list(self, name):
        """Append a results list to the contents of the tab.

        @param name: Name to use for making modifications to the list.

        @return True.

        """
        html = '<div id="%s"></div>' % cgi.escape(name)
        return self.append_output(html)


    def append_list_item(self, list_name, item_name, html):
        """Append an item to a results list.

        @param list_name: Name of list provided to append_list().
        @param item_name: Name to use for making modifications to the item.
        @param html: HTML to place in the list item.

        @return True.

        """
        # JSON does a better job of escaping HTML for JavaScript than we could
        # with string.replace().
        item_html = '"<div id=\\"%s\\"></div>"' % cgi.escape(item_name)
        # Use JavaScript to append the output.
        self._tab.ExecuteJavaScript(
                'document.getElementById("%s").innerHTML += %s; ' % (
                        cgi.escape(list_name),
                        item_html))
        self._tab.Activate()
        self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter()
        return self.replace_list_item(item_name, html)


    def replace_list_item(self, item_name, html):
        """Replace an item in a results list.

        @param item_name: Name of item provided to append_list_item().
        @param html: HTML to place in the list item.

        @return True.

        """
        # JSON does a better job of escaping HTML for JavaScript than we could
        # with string.replace().
        html_escaped = json.dumps(html)
        # Use JavaScript to append the output.
        self._tab.ExecuteJavaScript(
                'document.getElementById("%s").innerHTML = %s; ' % (
                        cgi.escape(item_name),
                        html_escaped))
        self._tab.Activate()
        self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter()
        return True


    def close(self):
        """Close the browser.

        @return True.

        """
        if hasattr(self, '_chrome'):
            self._chrome.browser.Close()
        return True


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    handler = logging.handlers.SysLogHandler(address='/dev/log')
    formatter = logging.Formatter(
            'interactive_xmlrpc_server: [%(levelname)s] %(message)s')
    handler.setFormatter(formatter)
    logging.getLogger().addHandler(handler)
    logging.debug('interactive_xmlrpc_server main...')
    server = xmlrpc_server.XmlRpcServer(
            'localhost',
            constants.INTERACTIVE_XMLRPC_SERVER_PORT)
    server.register_delegate(InteractiveXmlRpcDelegate())
    server.run()