# Copyright (c) 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import copy
import logging
import re
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros.network import iw_event_logger
# These must mirror the values in 'iw list' output.
CHAN_FLAG_DISABLED = 'disabled'
CHAN_FLAG_NO_IR = 'no IR'
CHAN_FLAG_PASSIVE_SCAN = 'passive scan'
CHAN_FLAG_RADAR_DETECT = 'radar detection'
DEV_MODE_AP = 'AP'
DEV_MODE_IBSS = 'IBSS'
DEV_MODE_MONITOR = 'monitor'
HT20 = 'HT20'
HT40_ABOVE = 'HT40+'
HT40_BELOW = 'HT40-'
SECURITY_OPEN = 'open'
SECURITY_WEP = 'wep'
SECURITY_WPA = 'wpa'
SECURITY_WPA2 = 'wpa2'
# Mixed mode security is WPA2/WPA
SECURITY_MIXED = 'mixed'
# Table of lookups between the output of item 'secondary channel offset:' from
# iw <device> scan to constants.
HT_TABLE = {'no secondary': HT20,
'above': HT40_ABOVE,
'below': HT40_BELOW}
IwBand = collections.namedtuple(
'Band', ['num', 'frequencies', 'frequency_flags', 'mcs_indices'])
IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security',
'ht', 'signal'])
IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type'])
IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list'])
# The fields for IwPhy are as follows:
# name: string name of the phy, such as "phy0"
# bands: list of IwBand objects.
# modes: List of strings containing interface modes supported, such as "AP".
# commands: List of strings containing nl80211 commands supported, such as
# "authenticate".
# features: List of strings containing nl80211 features supported, such as
# "T-DLS".
# max_scan_ssids: Maximum number of SSIDs which can be scanned at once.
IwPhy = collections.namedtuple(
'Phy', ['name', 'bands', 'modes', 'commands', 'features',
'max_scan_ssids', 'avail_tx_antennas', 'avail_rx_antennas',
'supports_setting_antenna_mask', 'support_vht'])
DEFAULT_COMMAND_IW = 'iw'
# Redirect stderr to stdout on Cros since adb commands cannot distinguish them
# on Brillo.
IW_TIME_COMMAND_FORMAT = '(time -p %s) 2>&1'
IW_TIME_COMMAND_OUTPUT_START = 'real'
IW_LINK_KEY_BEACON_INTERVAL = 'beacon int'
IW_LINK_KEY_DTIM_PERIOD = 'dtim period'
IW_LINK_KEY_FREQUENCY = 'freq'
IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log'
class IwRunner(object):
"""Defines an interface to the 'iw' command."""
def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW):
self._run = utils.run
self._host = remote_host
if remote_host:
self._run = remote_host.run
self._command_iw = command_iw
self._log_id = 0
def _parse_scan_results(self, output):
"""Parse the output of the 'scan' and 'scan dump' commands.
Here is an example of what a single network would look like for
the input parameter. Some fields have been removed in this example:
BSS 00:11:22:33:44:55(on wlan0)
freq: 2447
beacon interval: 100 TUs
signal: -46.00 dBm
Information elements from Probe Response frame:
SSID: my_open_network
Extended supported rates: 24.0 36.0 48.0 54.0
HT capabilities:
Capabilities: 0x0c
HT20
HT operation:
* primary channel: 8
* secondary channel offset: no secondary
* STA channel width: 20 MHz
RSN: * Version: 1
* Group cipher: CCMP
* Pairwise ciphers: CCMP
* Authentication suites: PSK
* Capabilities: 1-PTKSA-RC 1-GTKSA-RC (0x0000)
@param output: string command output.
@returns a list of IwBss namedtuples; None if the scan fails
"""
bss = None
frequency = None
ssid = None
ht = None
signal = None
security = None
supported_securities = []
bss_list = []
for line in output.splitlines():
line = line.strip()
bss_match = re.match('BSS ([0-9a-f:]+)', line)
if bss_match:
if bss != None:
security = self.determine_security(supported_securities)
iwbss = IwBss(bss, frequency, ssid, security, ht, signal)
bss_list.append(iwbss)
bss = frequency = ssid = security = ht = None
supported_securities = []
bss = bss_match.group(1)
if line.startswith('freq:'):
frequency = int(line.split()[1])
if line.startswith('signal:'):
signal = float(line.split()[1])
if line.startswith('SSID: '):
_, ssid = line.split(': ', 1)
if line.startswith('* secondary channel offset'):
ht = HT_TABLE[line.split(':')[1].strip()]
if line.startswith('WPA'):
supported_securities.append(SECURITY_WPA)
if line.startswith('RSN'):
supported_securities.append(SECURITY_WPA2)
security = self.determine_security(supported_securities)
bss_list.append(IwBss(bss, frequency, ssid, security, ht, signal))
return bss_list
def _parse_scan_time(self, output):
"""
Parse the scan time in seconds from the output of the 'time -p "scan"'
command.
'time -p' Command output format is below:
real 0.01
user 0.01
sys 0.00
@param output: string command output.
@returns float time in seconds.
"""
output_lines = output.splitlines()
for line_num, line in enumerate(output_lines):
line = line.strip()
if (line.startswith(IW_TIME_COMMAND_OUTPUT_START) and
output_lines[line_num + 1].startswith('user') and
output_lines[line_num + 2].startswith('sys')):
return float(line.split()[1])
raise error.TestFail('Could not parse scan time.')
def add_interface(self, phy, interface, interface_type):
"""
Add an interface to a WiFi PHY.
@param phy: string name of PHY to add an interface to.
@param interface: string name of interface to add.
@param interface_type: string type of interface to add (e.g. 'monitor').
"""
self._run('%s phy %s interface add %s type %s' %
(self._command_iw, phy, interface, interface_type))
def disconnect_station(self, interface):
"""
Disconnect a STA from a network.
@param interface: string name of interface to disconnect.
"""
self._run('%s dev %s disconnect' % (self._command_iw, interface))
def get_current_bssid(self, interface_name):
"""Get the BSSID that |interface_name| is associated with.
@param interface_name: string name of interface (e.g. 'wlan0').
@return string bssid of our current association, or None.
"""
result = self._run('%s dev %s link' %
(self._command_iw, interface_name),
ignore_status=True)
if result.exit_status:
# See comment in get_link_value.
return None
# We're looking for a line like:
# Connected to 04:f0:21:03:7d:bb (on wlan0)
match = re.search(
'Connected to ([0-9a-fA-F:]{17}) \\(on %s\\)' % interface_name,
result.stdout)
if match is None:
return None
return match.group(1)
def get_interface(self, interface_name):
"""Get full information about an interface given an interface name.
@param interface_name: string name of interface (e.g. 'wlan0').
@return IwNetDev tuple.
"""
matching_interfaces = [iw_if for iw_if in self.list_interfaces()
if iw_if.if_name == interface_name]
if len(matching_interfaces) != 1:
raise error.TestFail('Could not find interface named %s' %
interface_name)
return matching_interfaces[0]
def get_link_value(self, interface, iw_link_key):
"""Get the value of a link property for |interface|.
This command parses fields of iw link:
#> iw dev wlan0 link
Connected to 74:e5:43:10:4f:c0 (on wlan0)
SSID: PMKSACaching_4m9p5_ch1
freq: 5220
RX: 5370 bytes (37 packets)
TX: 3604 bytes (15 packets)
signal: -59 dBm
tx bitrate: 13.0 MBit/s MCS 1
bss flags: short-slot-time
dtim period: 5
beacon int: 100
@param iw_link_key: string one of IW_LINK_KEY_* defined above.
@param interface: string desired value of iw link property.
"""
result = self._run('%s dev %s link' % (self._command_iw, interface),
ignore_status=True)
if result.exit_status:
# When roaming, there is a period of time for mac80211 based drivers
# when the driver is 'associated' with an SSID but not a particular
# BSS. This causes iw to return an error code (-2) when attempting
# to retrieve information specific to the BSS. This does not happen
# in mwifiex drivers.
return None
find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key)
find_results = filter(bool,
map(find_re.match, result.stdout.splitlines()))
if not find_results:
return None
actual_value = find_results[0].group(1)
logging.info('Found iw link key %s with value %s.',
iw_link_key, actual_value)
return actual_value
def ibss_join(self, interface, ssid, frequency):
"""
Join a WiFi interface to an IBSS.
@param interface: string name of interface to join to the IBSS.
@param ssid: string SSID of IBSS to join.
@param frequency: int frequency of IBSS in Mhz.
"""
self._run('%s dev %s ibss join %s %d' %
(self._command_iw, interface, ssid, frequency))
def ibss_leave(self, interface):
"""
Leave an IBSS.
@param interface: string name of interface to remove from the IBSS.
"""
self._run('%s dev %s ibss leave' % (self._command_iw, interface))
def list_interfaces(self, desired_if_type=None):
"""List WiFi related interfaces on this system.
@param desired_if_type: string type of interface to filter
our returned list of interfaces for (e.g. 'managed').
@return list of IwNetDev tuples.
"""
# Parse output in the following format:
#
# $ adb shell iw dev
# phy#0
# Unnamed/non-netdev interface
# wdev 0x2
# addr aa:bb:cc:dd:ee:ff
# type P2P-device
# Interface wlan0
# ifindex 4
# wdev 0x1
# addr aa:bb:cc:dd:ee:ff
# ssid Whatever
# type managed
output = self._run('%s dev' % self._command_iw).stdout
interfaces = []
phy = None
if_name = None
if_type = None
for line in output.splitlines():
m = re.match('phy#([0-9]+)', line)
if m:
phy = 'phy%d' % int(m.group(1))
if_name = None
if_type = None
continue
if not phy:
continue
m = re.match('[\s]*Interface (.*)', line)
if m:
if_name = m.group(1)
continue
if not if_name:
continue
# Common values for type are 'managed', 'monitor', and 'IBSS'.
m = re.match('[\s]*type ([a-zA-Z]+)', line)
if m:
if_type = m.group(1)
interfaces.append(IwNetDev(phy=phy, if_name=if_name,
if_type=if_type))
# One phy may have many interfaces, so don't reset it.
if_name = None
if desired_if_type:
interfaces = [interface for interface in interfaces
if interface.if_type == desired_if_type]
return interfaces
def list_phys(self):
"""
List WiFi PHYs on the given host.
@return list of IwPhy tuples.
"""
output = self._run('%s list' % self._command_iw).stdout
pending_phy_name = None
current_band = None
current_section = None
all_phys = []
def add_pending_phy():
"""Add the pending phy into |all_phys|."""
bands = tuple(IwBand(band.num,
tuple(band.frequencies),
dict(band.frequency_flags),
tuple(band.mcs_indices))
for band in pending_phy_bands)
new_phy = IwPhy(pending_phy_name,
bands,
tuple(pending_phy_modes),
tuple(pending_phy_commands),
tuple(pending_phy_features),
pending_phy_max_scan_ssids,
pending_phy_tx_antennas,
pending_phy_rx_antennas,
pending_phy_tx_antennas and pending_phy_rx_antennas,
pending_phy_support_vht)
all_phys.append(new_phy)
for line in output.splitlines():
match_phy = re.search('Wiphy (.*)', line)
if match_phy:
if pending_phy_name:
add_pending_phy()
pending_phy_name = match_phy.group(1)
pending_phy_bands = []
pending_phy_modes = []
pending_phy_commands = []
pending_phy_features = []
pending_phy_max_scan_ssids = None
pending_phy_tx_antennas = 0
pending_phy_rx_antennas = 0
pending_phy_support_vht = False
continue
match_section = re.match('\s*(\w.*):\s*$', line)
if match_section:
current_section = match_section.group(1)
match_band = re.match('Band (\d+)', current_section)
if match_band:
current_band = IwBand(num=int(match_band.group(1)),
frequencies=[],
frequency_flags={},
mcs_indices=[])
pending_phy_bands.append(current_band)
continue
# Check for max_scan_ssids. This isn't a section, but it
# also isn't within a section.
match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)',
line)
if match_max_scan_ssids and pending_phy_name:
pending_phy_max_scan_ssids = int(
match_max_scan_ssids.group(1))
continue
if (current_section == 'Supported interface modes' and
pending_phy_name):
mode_match = re.search('\* (\w+)', line)
if mode_match:
pending_phy_modes.append(mode_match.group(1))
continue
if current_section == 'Supported commands' and pending_phy_name:
command_match = re.search('\* (\w+)', line)
if command_match:
pending_phy_commands.append(command_match.group(1))
continue
if (current_section is not None and
current_section.startswith('VHT Capabilities') and
pending_phy_name):
pending_phy_support_vht = True
continue
match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)'
' RX (\S+)', line)
if match_avail_antennas and pending_phy_name:
pending_phy_tx_antennas = int(
match_avail_antennas.group(1), 16)
pending_phy_rx_antennas = int(
match_avail_antennas.group(2), 16)
continue
match_device_support = re.match('\s*Device supports (.*)\.', line)
if match_device_support and pending_phy_name:
pending_phy_features.append(match_device_support.group(1))
continue
if not all([current_band, pending_phy_name,
line.startswith('\t')]):
continue
# E.g.
# * 2412 MHz [1] (20.0 dBm)
# * 2467 MHz [12] (20.0 dBm) (passive scan)
# * 2472 MHz [13] (disabled)
# * 5260 MHz [52] (19.0 dBm) (no IR, radar detection)
match_chan_info = re.search(
r'(?P<frequency>\d+) MHz'
r' (?P<chan_num>\[\d+\])'
r'(?: \((?P<tx_power_limit>[0-9.]+ dBm)\))?'
r'(?: \((?P<flags>[a-zA-Z, ]+)\))?', line)
if match_chan_info:
frequency = int(match_chan_info.group('frequency'))
current_band.frequencies.append(frequency)
flags_string = match_chan_info.group('flags')
if flags_string:
current_band.frequency_flags[frequency] = frozenset(
flags_string.split(','))
else:
# Populate the dict with an empty set, to make
# things uniform for client code.
current_band.frequency_flags[frequency] = frozenset()
continue
# re_mcs needs to match something like:
# HT TX/RX MCS rate indexes supported: 0-15, 32
if re.search('HT TX/RX MCS rate indexes supported: ', line):
rate_string = line.split(':')[1].strip()
for piece in rate_string.split(','):
if piece.find('-') > 0:
# Must be a range like ' 0-15'
begin, end = piece.split('-')
for index in range(int(begin), int(end) + 1):
current_band.mcs_indices.append(index)
else:
# Must be a single rate like '32 '
current_band.mcs_indices.append(int(piece))
if pending_phy_name:
add_pending_phy()
return all_phys
def remove_interface(self, interface, ignore_status=False):
"""
Remove a WiFi interface from a PHY.
@param interface: string name of interface (e.g. mon0)
@param ignore_status: boolean True iff we should ignore failures
to remove the interface.
"""
self._run('%s dev %s del' % (self._command_iw, interface),
ignore_status=ignore_status)
def determine_security(self, supported_securities):
"""Determines security from the given list of supported securities.
@param supported_securities: list of supported securities from scan
"""
if not supported_securities:
security = SECURITY_OPEN
elif len(supported_securities) == 1:
security = supported_securities[0]
else:
security = SECURITY_MIXED
return security
def scan(self, interface, frequencies=(), ssids=()):
"""Performs a scan.
@param interface: the interface to run the iw command against
@param frequencies: list of int frequencies in Mhz to scan.
@param ssids: list of string SSIDs to send probe requests for.
@returns a list of IwBss namedtuples; None if the scan fails
"""
scan_result = self.timed_scan(interface, frequencies, ssids)
if scan_result is None:
return None
return scan_result.bss_list
def timed_scan(self, interface, frequencies=(), ssids=()):
"""Performs a timed scan.
@param interface: the interface to run the iw command against
@param frequencies: list of int frequencies in Mhz to scan.
@param ssids: list of string SSIDs to send probe requests for.
@returns a IwTimedScan namedtuple; None if the scan fails
"""
freq_param = ''
if frequencies:
freq_param = ' freq %s' % ' '.join(map(str, frequencies))
ssid_param = ''
if ssids:
ssid_param = ' ssid "%s"' % '" "'.join(ssids)
iw_command = '%s dev %s scan%s%s' % (self._command_iw,
interface, freq_param, ssid_param)
command = IW_TIME_COMMAND_FORMAT % iw_command
scan = self._run(command, ignore_status=True)
if scan.exit_status != 0:
# The device was busy
logging.debug('scan exit_status: %d', scan.exit_status)
return None
if not scan.stdout:
raise error.TestFail('Missing scan parse time')
if scan.stdout.startswith(IW_TIME_COMMAND_OUTPUT_START):
logging.debug('Empty scan result')
bss_list = []
else:
bss_list = self._parse_scan_results(scan.stdout)
scan_time = self._parse_scan_time(scan.stdout)
return IwTimedScan(scan_time, bss_list)
def scan_dump(self, interface):
"""Dump the contents of the scan cache.
Note that this does not trigger a scan. Instead, it returns
the kernel's idea of what BSS's are currently visible.
@param interface: the interface to run the iw command against
@returns a list of IwBss namedtuples; None if the scan fails
"""
result = self._run('%s dev %s scan dump' % (self._command_iw,
interface))
return self._parse_scan_results(result.stdout)
def set_tx_power(self, interface, power):
"""
Set the transmission power for an interface.
@param interface: string name of interface to set Tx power on.
@param power: string power parameter. (e.g. 'auto').
"""
self._run('%s dev %s set txpower %s' %
(self._command_iw, interface, power))
def set_freq(self, interface, freq):
"""
Set the frequency for an interface.
@param interface: string name of interface to set frequency on.
@param freq: int frequency
"""
self._run('%s dev %s set freq %d' %
(self._command_iw, interface, freq))
def set_regulatory_domain(self, domain_string):
"""
Set the regulatory domain of the current machine. Note that
the regulatory change happens asynchronously to the exit of
this function.
@param domain_string: string regulatory domain name (e.g. 'US').
"""
self._run('%s reg set %s' % (self._command_iw, domain_string))
def get_regulatory_domain(self):
"""
Get the regulatory domain of the current machine.
@returns a string containing the 2-letter regulatory domain name
(e.g. 'US').
"""
output = self._run('%s reg get' % self._command_iw).stdout
m = re.match('^country (..):', output)
if not m:
return None
return m.group(1)
def wait_for_scan_result(self, interface, bsses=(), ssids=(),
timeout_seconds=30, wait_for_all=False):
"""Returns a list of IWBSS objects for given list of bsses or ssids.
This method will scan for a given timeout and return all of the networks
that have a matching ssid or bss. If wait_for_all is true and all
networks are not found within the given timeout an empty list will
be returned.
@param interface: which interface to run iw against
@param bsses: a list of BSS strings
@param ssids: a list of ssid strings
@param timeout_seconds: the amount of time to wait in seconds
@param wait_for_all: True to wait for all listed bsses or ssids; False
to return if any of the networks were found
@returns a list of IwBss collections that contain the given bss or ssid;
if the scan is empty or returns an error code None is returned.
"""
start_time = time.time()
scan_failure_attempts = 0
logging.info('Performing a scan with a max timeout of %d seconds.',
timeout_seconds)
remaining_bsses = copy.copy(bsses)
remaining_ssids = copy.copy(ssids)
while time.time() - start_time < timeout_seconds:
scan_results = self.scan(interface)
if scan_results is None or len(scan_results) == 0:
scan_failure_attempts += 1
# Allow in-progress scan to complete
time.sleep(5)
# If the in-progress scan takes more than 30 seconds to
# complete it will most likely never complete; abort.
# See crbug.com/309148.
if scan_failure_attempts > 5:
logging.error('Scan failed to run, see debug log for '
'error code.')
return None
continue
scan_failure_attempts = 0
matching_iwbsses = set()
for iwbss in scan_results:
if iwbss.bss in bsses and len(remaining_bsses) > 0:
remaining_bsses.remove(iwbss.bss)
matching_iwbsses.add(iwbss)
if iwbss.ssid in ssids and len(remaining_ssids) > 0:
remaining_ssids.remove(iwbss.ssid)
matching_iwbsses.add(iwbss)
if wait_for_all:
if len(remaining_bsses) == 0 and len(remaining_ssids) == 0:
return list(matching_iwbsses)
else:
if len(matching_iwbsses) > 0:
return list(matching_iwbsses)
if scan_failure_attempts > 0:
return None
# The SSID wasn't found, but the device is fine.
return list()
def wait_for_link(self, interface, timeout_seconds=10):
"""Waits until a link completes on |interface|.
@param interface: which interface to run iw against.
@param timeout_seconds: the amount of time to wait in seconds.
@returns True if link was established before the timeout.
"""
start_time = time.time()
while time.time() - start_time < timeout_seconds:
link_results = self._run('%s dev %s link' %
(self._command_iw, interface))
if 'Not connected' not in link_results.stdout:
return True
time.sleep(1)
return False
def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap):
"""Set antenna chain mask on given phy (radio).
This function will set the antennas allowed to use for TX and
RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|.
This command is only allowed when the interfaces on the phy are down.
@param phy: phy name
@param tx_bitmap: bitmap of allowed antennas to use for TX
@param rx_bitmap: bitmap of allowed antennas to use for RX
"""
command = '%s phy %s set antenna %d %d' % (self._command_iw, phy,
tx_bitmap, rx_bitmap)
self._run(command)
def get_event_logger(self):
"""Create and return a IwEventLogger object.
@returns a IwEventLogger object.
"""
local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id)
self._log_id += 1
return iw_event_logger.IwEventLogger(self._host, self._command_iw,
local_file)
def vht_supported(self):
"""Returns True if VHT is supported; False otherwise."""
result = self._run('%s list' % self._command_iw).stdout
if 'VHT Capabilities' in result:
return True
return False
def frequency_supported(self, frequency):
"""Returns True if the given frequency is supported; False otherwise.
@param frequency: int Wifi frequency to check if it is supported by
DUT.
"""
phys = self.list_phys()
for phy in phys:
for band in phy.bands:
if frequency in band.frequencies:
return True
return False