普通文本  |  209行  |  7.68 KB

# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import dbus
import logging
import pipes
import re
import shlex

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error


# Represents the result of a dbus-send call.  |sender| refers to the temporary
# bus name of dbus-send, |responder| to the remote process, and |response|
# contains the parsed response.
DBusSendResult = collections.namedtuple('DBusSendResult', ['sender',
                                                           'responder',
                                                           'response'])
# Used internally.
DictEntry = collections.namedtuple('DictEntry', ['key', 'value'])


def _build_token_stream(headerless_dbus_send_output):
    """A tokenizer for dbus-send output.

    The output is basically just like splitting on whitespace, except that
    strings are kept together by " characters.

    @param headerless_dbus_send_output: list of lines of dbus-send output
            without the meta-information prefix.
    @return list of tokens in dbus-send output.
    """
    return shlex.split(' '.join(headerless_dbus_send_output))


def _parse_value(token_stream):
    """Turn a stream of tokens from dbus-send output into native python types.

    @param token_stream: output from _build_token_stream() above.

    """
    # Assumes properly tokenized output (strings with spaces handled).
    # Assumes tokens are pre-stripped
    token_type = token_stream.pop(0)
    if token_type == 'variant':
        token_type = token_stream.pop(0)
    if token_type == 'object':
        token_type = token_stream.pop(0)  # Should be 'path'
    token_value = token_stream.pop(0)
    INT_TYPES = ('int16', 'uint16', 'int32', 'uint32',
                 'int64', 'uint64', 'byte')
    if token_type in INT_TYPES:
        return int(token_value)
    if token_type == 'string' or token_type == 'path':
        return token_value  # shlex removed surrounding " chars.
    if token_type == 'boolean':
        return token_value == 'true'
    if token_type == 'double':
        return float(token_value)
    if token_type == 'array':
        values = []
        while token_stream[0] != ']':
            values.append(_parse_value(token_stream))
        token_stream.pop(0)
        if values and all([isinstance(x, DictEntry) for x in values]):
            values = dict(values)
        return values
    if token_type == 'dict':
        assert token_value == 'entry('
        key = _parse_value(token_stream)
        value = _parse_value(token_stream)
        assert token_stream.pop(0) == ')'
        return DictEntry(key=key, value=value)
    raise error.TestError('Unhandled DBus type found: %s' % token_type)


def _parse_dbus_send_output(dbus_send_stdout):
    """Turn dbus-send output into usable Python types.

    This looks like:

    localhost ~ # dbus-send --system --dest=org.chromium.flimflam \
            --print-reply --reply-timeout=2000 / \
            org.chromium.flimflam.Manager.GetProperties
    method return sender=:1.12 -> dest=:1.37 reply_serial=2
       array [
          dict entry(
             string "ActiveProfile"
             variant             string "/profile/default"
          )
          dict entry(
             string "ArpGateway"
             variant             boolean true
          )
          ...
       ]

    @param dbus_send_output: string stdout from dbus-send
    @return a DBusSendResult.

    """
    lines = dbus_send_stdout.strip().splitlines()
    # The first line contains meta-information about the response
    header = lines[0]
    lines = lines[1:]
    dbus_address_pattern = '[:\d\\.]+'
    match = re.match('method return sender=(%s) -> dest=(%s) reply_serial=\d+' %
                     (dbus_address_pattern, dbus_address_pattern), header)
    if match is None:
        raise error.TestError('Could not parse dbus-send header: %s' % header)

    sender = match.group(1)
    responder = match.group(2)
    token_stream = _build_token_stream(lines)
    ret_val = _parse_value(token_stream)
    # Note that DBus permits multiple response values, and this is not handled.
    logging.debug('Got DBus response: %r', ret_val)
    return DBusSendResult(sender=sender, responder=responder, response=ret_val)


def _build_arg_string(raw_args):
    """Construct a string of arguments to a DBus method as dbus-send expects.

    @param raw_args list of dbus.* type objects to seriallize.
    @return string suitable for dbus-send.

    """
    dbus.Boolean
    int_map = {
            dbus.Int16: 'int16:',
            dbus.Int32: 'int32:',
            dbus.Int64: 'int64:',
            dbus.UInt16: 'uint16:',
            dbus.UInt32: 'uint32:',
            dbus.UInt64: 'uint64:',
            dbus.Double: 'double:',
            dbus.Byte: 'byte:',
    }
    arg_list = []
    for arg in raw_args:
        if isinstance(arg, dbus.String):
            arg_list.append(pipes.quote('string:%s' %
                                        arg.replace('"', r'\"')))
            continue
        if isinstance(arg, dbus.Boolean):
            if arg:
                arg_list.append('boolean:true')
            else:
                arg_list.append('boolean:false')
            continue
        for prim_type, prefix in int_map.iteritems():
            if isinstance(arg, prim_type):
                arg_list.append(prefix + str(arg))
                continue

        raise error.TestError('No support for serializing %r' % arg)
    return ' '.join(arg_list)


def dbus_send(bus_name, interface, object_path, method_name, args=None,
              host=None, timeout_seconds=2, tolerate_failures=False, user=None):
    """Call dbus-send without arguments.

    @param bus_name: string identifier of DBus connection to send a message to.
    @param interface: string DBus interface of object to call method on.
    @param object_path: string DBus path of remote object to call method on.
    @param method_name: string name of method to call.
    @param args: optional list of arguments.  Arguments must be of types
            from the python dbus module.
    @param host: An optional host object if running against a remote host.
    @param timeout_seconds: number of seconds to wait for a response.
    @param tolerate_failures: boolean True to ignore problems receiving a
            response.
    @param user: An option argument to run dbus-send as a given user.

    """
    run = utils.run if host is None else host.run
    cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s '
           '%s %s.%s' % (int(timeout_seconds * 1000), bus_name,
                         object_path, interface, method_name))

    if user is not None:
        cmd = ('sudo -u %s %s' % (user, cmd))
    if args is not None:
        cmd = cmd + ' ' + _build_arg_string(args)
    result = run(cmd, ignore_status=tolerate_failures)
    if result.exit_status != 0:
        logging.debug('%r', result.stdout)
        return None
    return _parse_dbus_send_output(result.stdout)


def get_property(bus_name, interface, object_path, property_name, host=None):
    """A helpful wrapper that extracts the value of a DBus property.

    @param bus_name: string identifier of DBus connection to send a message to.
    @param interface: string DBus interface exposing the property.
    @param object_path: string DBus path of remote object to call method on.
    @param property_name: string name of property to get.
    @param host: An optional host object if running against a remote host.

    """
    return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get',
                     args=[dbus.String(interface), dbus.String(property_name)],
                     host=host)