普通文本  |  600行  |  20.39 KB

#!/usr/bin/python

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dbus
import logging
import logging.handlers
import multiprocessing

import common
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.client.cros import xmlrpc_server
from autotest_lib.client.cros import constants
from autotest_lib.client.cros import cros_ui
from autotest_lib.client.cros import sys_power
from autotest_lib.client.cros import tpm_store
from autotest_lib.client.cros.networking import shill_proxy
from autotest_lib.client.cros.networking import wifi_proxy


class ShillXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate):
    """Exposes methods called remotely during WiFi autotests.

    All instance methods of this object without a preceding '_' are exposed via
    an XMLRPC server.  This is not a stateless handler object, which means that
    if you store state inside the delegate, that state will remain around for
    future calls.

    """

    DEFAULT_TEST_PROFILE_NAME = 'test'
    DBUS_DEVICE = 'Device'

    def __init__(self):
        self._wifi_proxy = wifi_proxy.WifiProxy()
        self._tpm_store = tpm_store.TPMStore()


    def __enter__(self):
        super(ShillXmlRpcDelegate, self).__enter__()
        if not cros_ui.stop(allow_fail=True):
            logging.error('UI did not stop, there could be trouble ahead.')
        self._tpm_store.__enter__()


    def __exit__(self, exception, value, traceback):
        super(ShillXmlRpcDelegate, self).__exit__(exception, value, traceback)
        self._tpm_store.__exit__(exception, value, traceback)
        self.enable_ui()


    @xmlrpc_server.dbus_safe(False)
    def create_profile(self, profile_name):
        """Create a shill profile.

        @param profile_name string name of profile to create.
        @return True on success, False otherwise.

        """
        self._wifi_proxy.manager.CreateProfile(profile_name)
        return True


    @xmlrpc_server.dbus_safe(False)
    def push_profile(self, profile_name):
        """Push a shill profile.

        @param profile_name string name of profile to push.
        @return True on success, False otherwise.

        """
        self._wifi_proxy.manager.PushProfile(profile_name)
        return True


    @xmlrpc_server.dbus_safe(False)
    def pop_profile(self, profile_name):
        """Pop a shill profile.

        @param profile_name string name of profile to pop.
        @return True on success, False otherwise.

        """
        if profile_name is None:
            self._wifi_proxy.manager.PopAnyProfile()
        else:
            self._wifi_proxy.manager.PopProfile(profile_name)
        return True


    @xmlrpc_server.dbus_safe(False)
    def remove_profile(self, profile_name):
        """Remove a profile from disk.

        @param profile_name string name of profile to remove.
        @return True on success, False otherwise.

        """
        self._wifi_proxy.manager.RemoveProfile(profile_name)
        return True


    @xmlrpc_server.dbus_safe(False)
    def clean_profiles(self):
        """Pop and remove shill profiles above the default profile.

        @return True on success, False otherwise.

        """
        while True:
            active_profile = self._wifi_proxy.get_active_profile()
            profile_name = self._wifi_proxy.dbus2primitive(
                    active_profile.GetProperties(utf8_strings=True)['Name'])
            if profile_name == 'default':
                return True
            self._wifi_proxy.manager.PopProfile(profile_name)
            self._wifi_proxy.manager.RemoveProfile(profile_name)


    @xmlrpc_server.dbus_safe(False)
    def configure_service_by_guid(self, raw_params):
        """Configure a service referenced by a GUID.

        @param raw_params serialized ConfigureServiceParameters.

        """
        params = xmlrpc_datatypes.deserialize(raw_params)
        shill = self._wifi_proxy
        properties = {}
        if params.autoconnect is not None:
            properties[shill.SERVICE_PROPERTY_AUTOCONNECT] = params.autoconnect
        if params.passphrase is not None:
            properties[shill.SERVICE_PROPERTY_PASSPHRASE] = params.passphrase
        if properties:
            self._wifi_proxy.configure_service_by_guid(params.guid, properties)
        return True


    @xmlrpc_server.dbus_safe(False)
    def configure_wifi_service(self, raw_params):
        """Configure a WiFi service

        @param raw_params serialized AssociationParameters.
        @return True on success, False otherwise.

        """
        params = xmlrpc_datatypes.deserialize(raw_params)
        return self._wifi_proxy.configure_wifi_service(
                params.ssid,
                params.security,
                params.security_parameters,
                save_credentials=params.save_credentials,
                station_type=params.station_type,
                hidden_network=params.is_hidden,
                guid=params.guid,
                autoconnect=params.autoconnect)


    def connect_wifi(self, raw_params):
        """Block and attempt to connect to wifi network.

        @param raw_params serialized AssociationParameters.
        @return serialized AssociationResult

        """
        logging.debug('connect_wifi()')
        params = xmlrpc_datatypes.deserialize(raw_params)
        params.security_config.install_client_credentials(self._tpm_store)
        wifi_if = params.bgscan_config.interface
        if wifi_if is None:
            logging.info('Using default interface for bgscan configuration')
            interfaces = self.list_controlled_wifi_interfaces()
            if not interfaces:
                return xmlrpc_datatypes.AssociationResult(
                        failure_reason='No wifi interfaces found?')

            if len(interfaces) > 1:
                logging.error('Defaulting to first interface of %r', interfaces)
            wifi_if = interfaces[0]
        if not self._wifi_proxy.configure_bgscan(
                wifi_if,
                method=params.bgscan_config.method,
                short_interval=params.bgscan_config.short_interval,
                long_interval=params.bgscan_config.long_interval,
                signal=params.bgscan_config.signal):
            return xmlrpc_datatypes.AssociationResult(
                    failure_reason='Failed to configure bgscan')

        raw = self._wifi_proxy.connect_to_wifi_network(
                params.ssid,
                params.security,
                params.security_parameters,
                params.save_credentials,
                station_type=params.station_type,
                hidden_network=params.is_hidden,
                guid=params.guid,
                discovery_timeout_seconds=params.discovery_timeout,
                association_timeout_seconds=params.association_timeout,
                configuration_timeout_seconds=params.configuration_timeout)
        result = xmlrpc_datatypes.AssociationResult.from_dbus_proxy_output(raw)
        return result


    @xmlrpc_server.dbus_safe(False)
    def delete_entries_for_ssid(self, ssid):
        """Delete a profile entry.

        @param ssid string of WiFi service for which to delete entries.
        @return True on success, False otherwise.

        """
        shill = self._wifi_proxy
        for profile in shill.get_profiles():
            profile_properties = shill.dbus2primitive(
                    profile.GetProperties(utf8_strings=True))
            entry_ids = profile_properties[shill.PROFILE_PROPERTY_ENTRIES]
            for entry_id in entry_ids:
                entry = profile.GetEntry(entry_id)
                if shill.dbus2primitive(entry[shill.ENTRY_FIELD_NAME]) == ssid:
                    profile.DeleteEntry(entry_id)
        return True


    def init_test_network_state(self):
        """Create a clean slate for tests with respect to remembered networks.

        For shill, this means popping and removing profiles, removing all WiFi
        entries from the default profile, and pushing a 'test' profile.

        @return True iff operation succeeded, False otherwise.

        """
        self.clean_profiles()
        self._wifi_proxy.remove_all_wifi_entries()
        self.remove_profile(self.DEFAULT_TEST_PROFILE_NAME)
        worked = self.create_profile(self.DEFAULT_TEST_PROFILE_NAME)
        if worked:
            worked = self.push_profile(self.DEFAULT_TEST_PROFILE_NAME)
        return worked


    @xmlrpc_server.dbus_safe(None)
    def list_controlled_wifi_interfaces(self):
        """List WiFi interfaces controlled by shill.

        @return list of string WiFi device names (e.g. ['mlan0'])

        """
        ret = []
        devices = self._wifi_proxy.get_devices()
        for device in devices:
            properties = self._wifi_proxy.dbus2primitive(
                    device.GetProperties(utf8_strings=True))
            if properties[self._wifi_proxy.DEVICE_PROPERTY_TYPE] != 'wifi':
                continue
            ret.append(properties[self._wifi_proxy.DEVICE_PROPERTY_NAME])
        return ret


    @xmlrpc_server.dbus_safe(False)
    def disconnect(self, ssid):
        """Attempt to disconnect from the given ssid.

        Blocks until disconnected or operation has timed out.  Returns True iff
        disconnect was successful.

        @param ssid string network to disconnect from.
        @return bool True on success, False otherwise.

        """
        logging.debug('disconnect()')
        result = self._wifi_proxy.disconnect_from_wifi_network(ssid)
        successful, duration, message = result
        if successful:
            level = logging.info
        else:
            level = logging.error
        level('Disconnect result: %r, duration: %d, reason: %s',
              successful, duration, message)
        return successful is True


    def wait_for_service_states(self, ssid, states, timeout_seconds):
        """Wait for service to achieve one state out of a list of states.

        @param ssid string the network to connect to (e.g. 'GoogleGuest').
        @param states tuple the states for which to wait
        @param timeout_seconds int seconds to wait for a state

        """
        return self._wifi_proxy.wait_for_service_states(
                ssid, states, timeout_seconds)


    @xmlrpc_server.dbus_safe(None)
    def get_service_order(self):
        """Get the shill service order.

        @return string service order on success, None otherwise.

        """
        return str(self._wifi_proxy.manager.GetServiceOrder())


    @xmlrpc_server.dbus_safe(False)
    def set_service_order(self, order):
        """Set the shill service order.

        @param order string comma-delimited service order (eg. 'ethernet,wifi')
        @return bool True on success, False otherwise.

        """
        self._wifi_proxy.manager.SetServiceOrder(dbus.String(order))
        return True


    @xmlrpc_server.dbus_safe(None)
    def get_service_properties(self, ssid):
        """Get a dict of properties for a service.

        @param ssid string service to get properties for.
        @return dict of Python friendly native types or None on failures.

        """
        discovery_params = {self._wifi_proxy.SERVICE_PROPERTY_TYPE: 'wifi',
                            self._wifi_proxy.SERVICE_PROPERTY_NAME: ssid}
        service_path = self._wifi_proxy.manager.FindMatchingService(
                discovery_params)
        service_object = self._wifi_proxy.get_dbus_object(
                self._wifi_proxy.DBUS_TYPE_SERVICE, service_path)
        service_properties = service_object.GetProperties(
                utf8_strings=True)
        return self._wifi_proxy.dbus2primitive(service_properties)


    @xmlrpc_server.dbus_safe(False)
    def get_active_wifi_SSIDs(self):
        """@return list of string SSIDs with at least one BSS we've scanned."""
        return self._wifi_proxy.get_active_wifi_SSIDs()


    @xmlrpc_server.dbus_safe(False)
    def set_sched_scan(self, enable):
        """Configure scheduled scan.

        @param enable bool flag indicating to enable/disable scheduled scan.
        @return True on success, False otherwise.

        """
        self._wifi_proxy.manager.set_sched_scan(enable)
        return True


    def enable_ui(self):
        """@return True iff the UI was successfully started."""
        return cros_ui.start(allow_fail=True, wait_for_login_prompt=False) == 0


    def sync_time_to(self, epoch_seconds):
        """Sync time on the DUT to |epoch_seconds| from the epoch.

        @param epoch_seconds: float number of seconds from the epoch.

        """
        utils.run('date -u --set=@%f' % epoch_seconds)
        return True


    @staticmethod
    def do_suspend(seconds):
        """Suspend DUT using the power manager.

        @param seconds: The number of seconds to suspend the device.

        """
        return sys_power.do_suspend(seconds)


    @staticmethod
    def do_suspend_bg(seconds):
        """Suspend DUT using the power manager - non-blocking.

        @param seconds int The number of seconds to suspend the device.

        """
        process = multiprocessing.Process(target=sys_power.do_suspend,
                                          args=(seconds, 1))
        process.start()
        return True


    @xmlrpc_server.dbus_safe(None)
    def get_dbus_property_on_device(self, wifi_interface, prop_name):
        """Get a property for the given WiFi device.

        @param wifi_interface: string name of interface being queried.
        @param prop_name: the name of the property.
        @return the current value of the property.

        """
        dbus_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if dbus_object is None:
            return None

        object_properties = dbus_object.GetProperties(utf8_strings=True)
        if prop_name not in object_properties:
            return None

        return self._wifi_proxy.dbus2primitive(
                object_properties[prop_name])


    @xmlrpc_server.dbus_safe(False)
    def set_dbus_property_on_device(self, wifi_interface, prop_name, value):
        """Set a property on the given WiFi device.

        @param wifi_interface: the device to set a property for.
        @param prop_name: the name of the property.
        @param value: the desired value of the property.
        @return True if successful, False otherwise.

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return False

        shill_proxy.ShillProxy.set_dbus_property(device_object,
                                                 prop_name,
                                                 value)
        return True


    @xmlrpc_server.dbus_safe(False)
    def request_roam_dbus(self, bssid, interface):
        """Request that we roam to the specified BSSID.

        Note that this operation assumes that:

        1) We're connected to an SSID for which |bssid| is a member.
        2) There is a BSS with an appropriate ID in our scan results.

        @param bssid: string BSSID of BSS to roam to.

        """

        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': interface})
        if device_object is None:
            return False
        device_object.RequestRoam(bssid)
        return True


    @xmlrpc_server.dbus_safe(False)
    def set_device_enabled(self, wifi_interface, enabled):
        """Enable or disable the WiFi device.

        @param wifi_interface: string name of interface being modified.
        @param enabled: boolean; true if this device should be enabled,
                false if this device should be disabled.
        @return True if it worked; false, otherwise

        """
        interface = {'Name': wifi_interface}
        dbus_object = self._wifi_proxy.find_object(self.DBUS_DEVICE,
                                                   interface)
        if dbus_object is None:
            return False

        if enabled:
            dbus_object.Enable()
        else:
            dbus_object.Disable()
        return True


    def discover_tdls_link(self, wifi_interface, peer_mac_address):
        """Send a TDLS Discover to |peer_mac_address| on |wifi_interface|.

        @param wifi_interface: string name of interface to send the discover on.
        @param peer_mac_address: string mac address of the TDLS peer device.

        @return True if it the operation was initiated; False otherwise

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return False
        device_object.PerformTDLSOperation('Discover', peer_mac_address)
        return True


    def establish_tdls_link(self, wifi_interface, peer_mac_address):
        """Establish a TDLS link with |peer_mac_address| on |wifi_interface|.

        @param wifi_interface: string name of interface to establish a link on.
        @param peer_mac_address: string mac address of the TDLS peer device.

        @return True if it the operation was initiated; False otherwise

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return False
        device_object.PerformTDLSOperation('Setup', peer_mac_address)
        return True


    @xmlrpc_server.dbus_safe(False)
    def query_tdls_link(self, wifi_interface, peer_mac_address):
        """Query the TDLS link with |peer_mac_address| on |wifi_interface|.

        @param wifi_interface: string name of interface to establish a link on.
        @param peer_mac_address: string mac address of the TDLS peer device.

        @return string indicating the current TDLS link status.

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return None
        return self._wifi_proxy.dbus2primitive(
                device_object.PerformTDLSOperation('Status', peer_mac_address))


    @xmlrpc_server.dbus_safe(False)
    def add_wake_packet_source(self, wifi_interface, source_ip):
        """Set up the NIC to wake on packets from the given source IP.

        @param wifi_interface: string name of interface to establish WoWLAN on.
        @param source_ip: string IP address of packet source, i.e. "127.0.0.1"

        @return True on success, False otherwise.

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return False
        device_object.AddWakeOnPacketConnection(source_ip)
        return True


    @xmlrpc_server.dbus_safe(False)
    def remove_wake_packet_source(self, wifi_interface, source_ip):
        """Stop waking on packets from the given source IP.

        @param wifi_interface: string name of interface to establish WoWLAN on.
        @param source_ip: string IP address of packet source, i.e. "127.0.0.1"

        @return True on success, False otherwise.

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return False
        device_object.RemoveWakeOnPacketConnection(source_ip)
        return True


    @xmlrpc_server.dbus_safe(False)
    def remove_all_wake_packet_sources(self, wifi_interface):
        """Stop waking on packets from any IP.

        @param wifi_interface: string name of interface to establish WoWLAN on.

        @return True on success, False otherwise.

        """
        device_object = self._wifi_proxy.find_object(
                self.DBUS_DEVICE, {'Name': wifi_interface})
        if device_object is None:
            return False
        device_object.RemoveAllWakeOnPacketConnections()
        return True



if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    handler = logging.handlers.SysLogHandler(address = '/dev/log')
    formatter = logging.Formatter(
            'shill_xmlrpc_server: [%(levelname)s] %(message)s')
    handler.setFormatter(formatter)
    logging.getLogger().addHandler(handler)
    logging.debug('shill_xmlrpc_server main...')
    server = xmlrpc_server.XmlRpcServer('localhost',
                                         constants.SHILL_XMLRPC_SERVER_PORT)
    server.register_delegate(ShillXmlRpcDelegate())
    server.run()