#!/usr/bin/python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import logging
import logging.handlers
import multiprocessing
import common
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.client.cros import xmlrpc_server
from autotest_lib.client.cros import constants
from autotest_lib.client.cros import cros_ui
from autotest_lib.client.cros import sys_power
from autotest_lib.client.cros import tpm_store
from autotest_lib.client.cros.networking import shill_proxy
from autotest_lib.client.cros.networking import wifi_proxy
class ShillXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate):
"""Exposes methods called remotely during WiFi autotests.
All instance methods of this object without a preceding '_' are exposed via
an XMLRPC server. This is not a stateless handler object, which means that
if you store state inside the delegate, that state will remain around for
future calls.
"""
DEFAULT_TEST_PROFILE_NAME = 'test'
DBUS_DEVICE = 'Device'
def __init__(self):
self._wifi_proxy = wifi_proxy.WifiProxy()
self._tpm_store = tpm_store.TPMStore()
def __enter__(self):
super(ShillXmlRpcDelegate, self).__enter__()
if not cros_ui.stop(allow_fail=True):
logging.error('UI did not stop, there could be trouble ahead.')
self._tpm_store.__enter__()
def __exit__(self, exception, value, traceback):
super(ShillXmlRpcDelegate, self).__exit__(exception, value, traceback)
self._tpm_store.__exit__(exception, value, traceback)
self.enable_ui()
@xmlrpc_server.dbus_safe(False)
def create_profile(self, profile_name):
"""Create a shill profile.
@param profile_name string name of profile to create.
@return True on success, False otherwise.
"""
self._wifi_proxy.manager.CreateProfile(profile_name)
return True
@xmlrpc_server.dbus_safe(False)
def push_profile(self, profile_name):
"""Push a shill profile.
@param profile_name string name of profile to push.
@return True on success, False otherwise.
"""
self._wifi_proxy.manager.PushProfile(profile_name)
return True
@xmlrpc_server.dbus_safe(False)
def pop_profile(self, profile_name):
"""Pop a shill profile.
@param profile_name string name of profile to pop.
@return True on success, False otherwise.
"""
if profile_name is None:
self._wifi_proxy.manager.PopAnyProfile()
else:
self._wifi_proxy.manager.PopProfile(profile_name)
return True
@xmlrpc_server.dbus_safe(False)
def remove_profile(self, profile_name):
"""Remove a profile from disk.
@param profile_name string name of profile to remove.
@return True on success, False otherwise.
"""
self._wifi_proxy.manager.RemoveProfile(profile_name)
return True
@xmlrpc_server.dbus_safe(False)
def clean_profiles(self):
"""Pop and remove shill profiles above the default profile.
@return True on success, False otherwise.
"""
while True:
active_profile = self._wifi_proxy.get_active_profile()
profile_name = self._wifi_proxy.dbus2primitive(
active_profile.GetProperties(utf8_strings=True)['Name'])
if profile_name == 'default':
return True
self._wifi_proxy.manager.PopProfile(profile_name)
self._wifi_proxy.manager.RemoveProfile(profile_name)
@xmlrpc_server.dbus_safe(False)
def configure_service_by_guid(self, raw_params):
"""Configure a service referenced by a GUID.
@param raw_params serialized ConfigureServiceParameters.
"""
params = xmlrpc_datatypes.deserialize(raw_params)
shill = self._wifi_proxy
properties = {}
if params.autoconnect is not None:
properties[shill.SERVICE_PROPERTY_AUTOCONNECT] = params.autoconnect
if params.passphrase is not None:
properties[shill.SERVICE_PROPERTY_PASSPHRASE] = params.passphrase
if properties:
self._wifi_proxy.configure_service_by_guid(params.guid, properties)
return True
@xmlrpc_server.dbus_safe(False)
def configure_wifi_service(self, raw_params):
"""Configure a WiFi service
@param raw_params serialized AssociationParameters.
@return True on success, False otherwise.
"""
params = xmlrpc_datatypes.deserialize(raw_params)
return self._wifi_proxy.configure_wifi_service(
params.ssid,
params.security,
params.security_parameters,
save_credentials=params.save_credentials,
station_type=params.station_type,
hidden_network=params.is_hidden,
guid=params.guid,
autoconnect=params.autoconnect)
def connect_wifi(self, raw_params):
"""Block and attempt to connect to wifi network.
@param raw_params serialized AssociationParameters.
@return serialized AssociationResult
"""
logging.debug('connect_wifi()')
params = xmlrpc_datatypes.deserialize(raw_params)
params.security_config.install_client_credentials(self._tpm_store)
wifi_if = params.bgscan_config.interface
if wifi_if is None:
logging.info('Using default interface for bgscan configuration')
interfaces = self.list_controlled_wifi_interfaces()
if not interfaces:
return xmlrpc_datatypes.AssociationResult(
failure_reason='No wifi interfaces found?')
if len(interfaces) > 1:
logging.error('Defaulting to first interface of %r', interfaces)
wifi_if = interfaces[0]
if not self._wifi_proxy.configure_bgscan(
wifi_if,
method=params.bgscan_config.method,
short_interval=params.bgscan_config.short_interval,
long_interval=params.bgscan_config.long_interval,
signal=params.bgscan_config.signal):
return xmlrpc_datatypes.AssociationResult(
failure_reason='Failed to configure bgscan')
raw = self._wifi_proxy.connect_to_wifi_network(
params.ssid,
params.security,
params.security_parameters,
params.save_credentials,
station_type=params.station_type,
hidden_network=params.is_hidden,
guid=params.guid,
discovery_timeout_seconds=params.discovery_timeout,
association_timeout_seconds=params.association_timeout,
configuration_timeout_seconds=params.configuration_timeout)
result = xmlrpc_datatypes.AssociationResult.from_dbus_proxy_output(raw)
return result
@xmlrpc_server.dbus_safe(False)
def delete_entries_for_ssid(self, ssid):
"""Delete a profile entry.
@param ssid string of WiFi service for which to delete entries.
@return True on success, False otherwise.
"""
shill = self._wifi_proxy
for profile in shill.get_profiles():
profile_properties = shill.dbus2primitive(
profile.GetProperties(utf8_strings=True))
entry_ids = profile_properties[shill.PROFILE_PROPERTY_ENTRIES]
for entry_id in entry_ids:
entry = profile.GetEntry(entry_id)
if shill.dbus2primitive(entry[shill.ENTRY_FIELD_NAME]) == ssid:
profile.DeleteEntry(entry_id)
return True
def init_test_network_state(self):
"""Create a clean slate for tests with respect to remembered networks.
For shill, this means popping and removing profiles, removing all WiFi
entries from the default profile, and pushing a 'test' profile.
@return True iff operation succeeded, False otherwise.
"""
self.clean_profiles()
self._wifi_proxy.remove_all_wifi_entries()
self.remove_profile(self.DEFAULT_TEST_PROFILE_NAME)
worked = self.create_profile(self.DEFAULT_TEST_PROFILE_NAME)
if worked:
worked = self.push_profile(self.DEFAULT_TEST_PROFILE_NAME)
return worked
@xmlrpc_server.dbus_safe(None)
def list_controlled_wifi_interfaces(self):
"""List WiFi interfaces controlled by shill.
@return list of string WiFi device names (e.g. ['mlan0'])
"""
ret = []
devices = self._wifi_proxy.get_devices()
for device in devices:
properties = self._wifi_proxy.dbus2primitive(
device.GetProperties(utf8_strings=True))
if properties[self._wifi_proxy.DEVICE_PROPERTY_TYPE] != 'wifi':
continue
ret.append(properties[self._wifi_proxy.DEVICE_PROPERTY_NAME])
return ret
@xmlrpc_server.dbus_safe(False)
def disconnect(self, ssid):
"""Attempt to disconnect from the given ssid.
Blocks until disconnected or operation has timed out. Returns True iff
disconnect was successful.
@param ssid string network to disconnect from.
@return bool True on success, False otherwise.
"""
logging.debug('disconnect()')
result = self._wifi_proxy.disconnect_from_wifi_network(ssid)
successful, duration, message = result
if successful:
level = logging.info
else:
level = logging.error
level('Disconnect result: %r, duration: %d, reason: %s',
successful, duration, message)
return successful is True
def wait_for_service_states(self, ssid, states, timeout_seconds):
"""Wait for service to achieve one state out of a list of states.
@param ssid string the network to connect to (e.g. 'GoogleGuest').
@param states tuple the states for which to wait
@param timeout_seconds int seconds to wait for a state
"""
return self._wifi_proxy.wait_for_service_states(
ssid, states, timeout_seconds)
@xmlrpc_server.dbus_safe(None)
def get_service_order(self):
"""Get the shill service order.
@return string service order on success, None otherwise.
"""
return str(self._wifi_proxy.manager.GetServiceOrder())
@xmlrpc_server.dbus_safe(False)
def set_service_order(self, order):
"""Set the shill service order.
@param order string comma-delimited service order (eg. 'ethernet,wifi')
@return bool True on success, False otherwise.
"""
self._wifi_proxy.manager.SetServiceOrder(dbus.String(order))
return True
@xmlrpc_server.dbus_safe(None)
def get_service_properties(self, ssid):
"""Get a dict of properties for a service.
@param ssid string service to get properties for.
@return dict of Python friendly native types or None on failures.
"""
discovery_params = {self._wifi_proxy.SERVICE_PROPERTY_TYPE: 'wifi',
self._wifi_proxy.SERVICE_PROPERTY_NAME: ssid}
service_path = self._wifi_proxy.manager.FindMatchingService(
discovery_params)
service_object = self._wifi_proxy.get_dbus_object(
self._wifi_proxy.DBUS_TYPE_SERVICE, service_path)
service_properties = service_object.GetProperties(
utf8_strings=True)
return self._wifi_proxy.dbus2primitive(service_properties)
@xmlrpc_server.dbus_safe(False)
def get_active_wifi_SSIDs(self):
"""@return list of string SSIDs with at least one BSS we've scanned."""
return self._wifi_proxy.get_active_wifi_SSIDs()
@xmlrpc_server.dbus_safe(False)
def set_sched_scan(self, enable):
"""Configure scheduled scan.
@param enable bool flag indicating to enable/disable scheduled scan.
@return True on success, False otherwise.
"""
self._wifi_proxy.manager.set_sched_scan(enable)
return True
def enable_ui(self):
"""@return True iff the UI was successfully started."""
return cros_ui.start(allow_fail=True, wait_for_login_prompt=False) == 0
def sync_time_to(self, epoch_seconds):
"""Sync time on the DUT to |epoch_seconds| from the epoch.
@param epoch_seconds: float number of seconds from the epoch.
"""
utils.run('date -u --set=@%f' % epoch_seconds)
return True
@staticmethod
def do_suspend(seconds):
"""Suspend DUT using the power manager.
@param seconds: The number of seconds to suspend the device.
"""
return sys_power.do_suspend(seconds)
@staticmethod
def do_suspend_bg(seconds):
"""Suspend DUT using the power manager - non-blocking.
@param seconds int The number of seconds to suspend the device.
"""
process = multiprocessing.Process(target=sys_power.do_suspend,
args=(seconds, 1))
process.start()
return True
@xmlrpc_server.dbus_safe(None)
def get_dbus_property_on_device(self, wifi_interface, prop_name):
"""Get a property for the given WiFi device.
@param wifi_interface: string name of interface being queried.
@param prop_name: the name of the property.
@return the current value of the property.
"""
dbus_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if dbus_object is None:
return None
object_properties = dbus_object.GetProperties(utf8_strings=True)
if prop_name not in object_properties:
return None
return self._wifi_proxy.dbus2primitive(
object_properties[prop_name])
@xmlrpc_server.dbus_safe(False)
def set_dbus_property_on_device(self, wifi_interface, prop_name, value):
"""Set a property on the given WiFi device.
@param wifi_interface: the device to set a property for.
@param prop_name: the name of the property.
@param value: the desired value of the property.
@return True if successful, False otherwise.
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return False
shill_proxy.ShillProxy.set_dbus_property(device_object,
prop_name,
value)
return True
@xmlrpc_server.dbus_safe(False)
def request_roam_dbus(self, bssid, interface):
"""Request that we roam to the specified BSSID.
Note that this operation assumes that:
1) We're connected to an SSID for which |bssid| is a member.
2) There is a BSS with an appropriate ID in our scan results.
@param bssid: string BSSID of BSS to roam to.
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': interface})
if device_object is None:
return False
device_object.RequestRoam(bssid)
return True
@xmlrpc_server.dbus_safe(False)
def set_device_enabled(self, wifi_interface, enabled):
"""Enable or disable the WiFi device.
@param wifi_interface: string name of interface being modified.
@param enabled: boolean; true if this device should be enabled,
false if this device should be disabled.
@return True if it worked; false, otherwise
"""
interface = {'Name': wifi_interface}
dbus_object = self._wifi_proxy.find_object(self.DBUS_DEVICE,
interface)
if dbus_object is None:
return False
if enabled:
dbus_object.Enable()
else:
dbus_object.Disable()
return True
def discover_tdls_link(self, wifi_interface, peer_mac_address):
"""Send a TDLS Discover to |peer_mac_address| on |wifi_interface|.
@param wifi_interface: string name of interface to send the discover on.
@param peer_mac_address: string mac address of the TDLS peer device.
@return True if it the operation was initiated; False otherwise
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return False
device_object.PerformTDLSOperation('Discover', peer_mac_address)
return True
def establish_tdls_link(self, wifi_interface, peer_mac_address):
"""Establish a TDLS link with |peer_mac_address| on |wifi_interface|.
@param wifi_interface: string name of interface to establish a link on.
@param peer_mac_address: string mac address of the TDLS peer device.
@return True if it the operation was initiated; False otherwise
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return False
device_object.PerformTDLSOperation('Setup', peer_mac_address)
return True
@xmlrpc_server.dbus_safe(False)
def query_tdls_link(self, wifi_interface, peer_mac_address):
"""Query the TDLS link with |peer_mac_address| on |wifi_interface|.
@param wifi_interface: string name of interface to establish a link on.
@param peer_mac_address: string mac address of the TDLS peer device.
@return string indicating the current TDLS link status.
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return None
return self._wifi_proxy.dbus2primitive(
device_object.PerformTDLSOperation('Status', peer_mac_address))
@xmlrpc_server.dbus_safe(False)
def add_wake_packet_source(self, wifi_interface, source_ip):
"""Set up the NIC to wake on packets from the given source IP.
@param wifi_interface: string name of interface to establish WoWLAN on.
@param source_ip: string IP address of packet source, i.e. "127.0.0.1"
@return True on success, False otherwise.
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return False
device_object.AddWakeOnPacketConnection(source_ip)
return True
@xmlrpc_server.dbus_safe(False)
def remove_wake_packet_source(self, wifi_interface, source_ip):
"""Stop waking on packets from the given source IP.
@param wifi_interface: string name of interface to establish WoWLAN on.
@param source_ip: string IP address of packet source, i.e. "127.0.0.1"
@return True on success, False otherwise.
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return False
device_object.RemoveWakeOnPacketConnection(source_ip)
return True
@xmlrpc_server.dbus_safe(False)
def remove_all_wake_packet_sources(self, wifi_interface):
"""Stop waking on packets from any IP.
@param wifi_interface: string name of interface to establish WoWLAN on.
@return True on success, False otherwise.
"""
device_object = self._wifi_proxy.find_object(
self.DBUS_DEVICE, {'Name': wifi_interface})
if device_object is None:
return False
device_object.RemoveAllWakeOnPacketConnections()
return True
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
handler = logging.handlers.SysLogHandler(address = '/dev/log')
formatter = logging.Formatter(
'shill_xmlrpc_server: [%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
logging.debug('shill_xmlrpc_server main...')
server = xmlrpc_server.XmlRpcServer('localhost',
constants.SHILL_XMLRPC_SERVER_PORT)
server.register_delegate(ShillXmlRpcDelegate())
server.run()