"""Convenience functions for use by network tests or whomever.
This library is to release in the public repository.
"""
import commands, os, re, socket, sys, time, struct
from autotest_lib.client.common_lib import error
from autotest_lib.client.bin import utils as bin_utils
TIMEOUT = 10 # Used for socket timeout and barrier timeout
class network_utils(object):
def reset(self, ignore_status=False):
bin_utils.system('service network restart', ignore_status=ignore_status)
def start(self, ignore_status=False):
bin_utils.system('service network start', ignore_status=ignore_status)
def stop(self, ignore_status=False):
bin_utils.system('service network stop', ignore_status=ignore_status)
def list(self):
bin_utils.system('ifconfig -a')
def get_ip_local(self, query_ip, netmask="24"):
"""
Get ip address in local system which can communicate with query_ip.
@param query_ip: IP of client which wants to communicate with
autotest machine.
@return: IP address which can communicate with query_ip
"""
ip = bin_utils.system_output("ip addr show to %s/%s" %
(query_ip, netmask))
ip = re.search(r"inet ([0-9.]*)/",ip)
if ip is None:
return ip
return ip.group(1)
def disable_ip_local_loopback(self, ignore_status=False):
bin_utils.system(
"echo '1' > /proc/sys/net/ipv4/route/no_local_loopback",
ignore_status=ignore_status)
bin_utils.system('echo 1 > /proc/sys/net/ipv4/route/flush',
ignore_status=ignore_status)
def enable_ip_local_loopback(self, ignore_status=False):
bin_utils.system(
"echo '0' > /proc/sys/net/ipv4/route/no_local_loopback",
ignore_status=ignore_status)
bin_utils.system('echo 1 > /proc/sys/net/ipv4/route/flush',
ignore_status=ignore_status)
def process_mpstat(self, mpstat_out, sample_count, loud = True):
"""Parses mpstat output of the following two forms:
02:10:17 0 0.00 0.00 0.00 0.00 0.00 0.00 \
0.00 100.00 1012.87
02:10:13 PM 0 0.00 0.00 0.00 0.00 0.00 0.00 \
0.00 100.00 1019.00
"""
mpstat_keys = ['time', 'CPU', 'user', 'nice', 'sys', 'iowait', 'irq',
'soft', 'steal', 'idle', 'intr/s']
if loud:
print mpstat_out
# Remove the optional AM/PM appearing in time format
mpstat_out = mpstat_out.replace('AM', '')
mpstat_out = mpstat_out.replace('PM', '')
regex = re.compile('(\S+)')
stats = []
for line in mpstat_out.splitlines()[3:]:
match = regex.findall(line)
# Skip the "Average" computed by mpstat. We are gonna compute the
# average ourself. Pick only the aggregate 'all' CPU results
if match and match[0] != 'Average:' and match[1] == 'all':
stats.append(dict(zip(mpstat_keys, match)))
if sample_count >= 5:
# Throw away first and last sample
stats = stats[1:-1]
cpu_stats = {}
for key in ['user', 'nice', 'sys', 'iowait', 'irq', 'soft', 'steal',
'idle', 'intr/s']:
x = [float(row[key]) for row in stats]
if len(x):
count = len(x)
else:
print 'net_utils.network_utils.process_mpstat: count is 0!!!\n'
count = 1
cpu_stats[key] = sum(x) / count
return cpu_stats
def network():
try:
from autotest_lib.client.bin.net import site_net_utils
return site_net_utils.network_utils()
except:
return network_utils()
class network_interface(object):
ENABLE, DISABLE = (True, False)
def __init__(self, name):
autodir = os.environ['AUTODIR']
self.ethtool = 'ethtool'
self._name = name
self.was_down = self.is_down()
self.orig_ipaddr = self.get_ipaddr()
self.was_loopback_enabled = self.is_loopback_enabled()
self._socket = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
self._socket.settimeout(TIMEOUT)
self._socket.bind((name, raw_socket.ETH_P_ALL))
def restore(self):
self.set_ipaddr(self.orig_ipaddr)
# TODO (msb): The additional conditional guard needs cleanup:
# Underlying driver should simply perform a noop
# for disabling loopback on an already-disabled device,
# instead of returning non-zero exit code.
# To avoid sending a RST to the autoserv ssh connection
# don't disable loopback until the IP address is restored.
if not self.was_loopback_enabled and self.is_loopback_enabled():
self.disable_loopback()
if self.was_down:
self.down()
def get_name(self):
return self._name
def parse_ethtool(self, field, match, option='', next_field=''):
output = bin_utils.system_output('%s %s %s' % (self.ethtool,
option, self._name))
if output:
match = re.search('\n\s*%s:\s*(%s)%s' %
(field, match, next_field), output, re.S)
if match:
return match.group(1)
return ''
def get_stats(self):
stats = {}
stats_path = '/sys/class/net/%s/statistics/' % self._name
for stat in os.listdir(stats_path):
f = open(stats_path + stat, 'r')
if f:
stats[stat] = int(f.read())
f.close()
return stats
def get_stats_diff(self, orig_stats):
stats = self.get_stats()
for stat in stats.keys():
if stat in orig_stats:
stats[stat] = stats[stat] - orig_stats[stat]
else:
stats[stat] = stats[stat]
return stats
def get_driver(self):
driver_path = os.readlink('/sys/class/net/%s/device/driver' %
self._name)
return os.path.basename(driver_path)
def get_carrier(self):
f = open('/sys/class/net/%s/carrier' % self._name)
if not f:
return ''
carrier = f.read().strip()
f.close()
return carrier
def get_supported_link_modes(self):
result = self.parse_ethtool('Supported link modes', '.*',
next_field='Supports auto-negotiation')
return result.split()
def get_advertised_link_modes(self):
result = self.parse_ethtool('Advertised link modes', '.*',
next_field='Advertised auto-negotiation')
return result.split()
def is_autoneg_advertised(self):
result = self.parse_ethtool('Advertised auto-negotiation',
'Yes|No')
return result == 'Yes'
def get_speed(self):
return int(self.parse_ethtool('Speed', '\d+'))
def is_full_duplex(self):
result = self.parse_ethtool('Duplex', 'Full|Half')
return result == 'Full'
def is_autoneg_on(self):
result = self.parse_ethtool('Auto-negotiation', 'on|off')
return result == 'on'
def get_wakeon(self):
return self.parse_ethtool('Wake-on', '\w+')
def is_rx_summing_on(self):
result = self.parse_ethtool('rx-checksumming', 'on|off', '-k')
return result == 'on'
def is_tx_summing_on(self):
result = self.parse_ethtool('tx-checksumming', 'on|off', '-k')
return result == 'on'
def is_scatter_gather_on(self):
result = self.parse_ethtool('scatter-gather', 'on|off', '-k')
return result == 'on'
def is_tso_on(self):
result = self.parse_ethtool('tcp segmentation offload',
'on|off', '-k')
return result == 'on'
def is_pause_autoneg_on(self):
result = self.parse_ethtool('Autonegotiate', 'on|off', '-a')
return result == 'on'
def is_tx_pause_on(self):
result = self.parse_ethtool('TX', 'on|off', '-a')
return result == 'on'
def is_rx_pause_on(self):
result = self.parse_ethtool('RX', 'on|off', '-a')
return result == 'on'
def _set_loopback(self, mode, enable_disable):
return bin_utils.system('%s -L %s %s %s' %
(self.ethtool, self._name, mode, enable_disable),
ignore_status=True)
def enable_loopback(self):
# If bonded do not set loopback mode.
# Try mac loopback first then phy loopback
# If both fail, raise an error
if bond().is_enabled():
raise error.TestError('Unable to enable loopback while '
'bonding is enabled.')
if (self._set_loopback('phyint', 'enable') > 0 and
self._set_loopback('mac', 'enable') > 0):
raise error.TestError('Unable to enable loopback')
# Add a 1 second wait for drivers which do not have
# a synchronous loopback enable
# TODO (msb); Remove this wait once the drivers are fixed
if self.get_driver() in ['tg3', 'bnx2x']:
time.sleep(1)
self.wait_for_carrier(timeout=30)
def disable_loopback(self):
# Try mac loopback first then phy loopback
# If both fail, raise an error
if (self._set_loopback('phyint', 'disable') > 0 and
self._set_loopback('mac', 'disable') > 0):
raise error.TestError('Unable to disable loopback')
def is_loopback_enabled(self):
# Don't try ethtool -l on a bonded host
if bond().is_enabled():
return False
output = bin_utils.system_output('%s -l %s'
% (self.ethtool, self._name))
if output:
return 'enabled' in output
return False
def enable_promisc(self):
bin_utils.system('ifconfig %s promisc' % self._name)
def disable_promisc(self):
bin_utils.system('ifconfig %s -promisc' % self._name)
def get_hwaddr(self):
f = open('/sys/class/net/%s/address' % self._name)
hwaddr = f.read().strip()
f.close()
return hwaddr
def set_hwaddr(self, hwaddr):
bin_utils.system('ifconfig %s hw ether %s' % (self._name, hwaddr))
def add_maddr(self, maddr):
bin_utils.system('ip maddr add %s dev %s' % (maddr, self._name))
def del_maddr(self, maddr):
bin_utils.system('ip maddr del %s dev %s' % (maddr, self._name))
def get_ipaddr(self):
ipaddr = "0.0.0.0"
output = bin_utils.system_output('ifconfig %s' % self._name)
if output:
match = re.search("inet addr:([\d\.]+)", output)
if match:
ipaddr = match.group(1)
return ipaddr
def set_ipaddr(self, ipaddr):
bin_utils.system('ifconfig %s %s' % (self._name, ipaddr))
def is_down(self):
output = bin_utils.system_output('ifconfig %s' % self._name)
if output:
return 'UP' not in output
return False
def up(self):
bin_utils.system('ifconfig %s up' % self._name)
def down(self):
bin_utils.system('ifconfig %s down' % self._name)
def wait_for_carrier(self, timeout=60):
while timeout and self.get_carrier() != '1':
timeout -= 1
time.sleep(1)
if timeout == 0:
raise error.TestError('Timed out waiting for carrier.')
def send(self, buf):
self._socket.send(buf)
def recv(self, len):
return self._socket.recv(len)
def flush(self):
self._socket.close()
self._socket = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
self._socket.settimeout(TIMEOUT)
self._socket.bind((self._name, raw_socket.ETH_P_ALL))
def netif(name):
try:
from autotest_lib.client.bin.net import site_net_utils
return site_net_utils.network_interface(name)
except:
return network_interface(name)
class bonding(object):
"""This class implements bonding interface abstraction."""
NO_MODE = 0
AB_MODE = 1
AD_MODE = 2
def is_enabled(self):
raise error.TestError('Undefined')
def is_bondable(self):
raise error.TestError('Undefined')
def enable(self):
raise error.TestError('Undefined')
def disable(self):
raise error.TestError('Undefined')
def get_mii_status(self):
return {}
def get_mode(self):
return bonding.NO_MODE
def wait_for_state_change(self):
"""Wait for bonding state change.
Wait up to 90 seconds to successfully ping the gateway.
This is to know when LACP state change has converged.
(0 seconds is 3x lacp timeout, use by protocol)
"""
netif('eth0').wait_for_carrier(timeout=60)
wait_time = 0
while wait_time < 100:
time.sleep(10)
if not bin_utils.ping_default_gateway():
return True
wait_time += 10
return False
def get_active_interfaces(self):
return []
def get_slave_interfaces(self):
return []
def bond():
try:
from autotest_lib.client.bin.net import site_net_utils
return site_net_utils.bonding()
except:
return bonding()
class raw_socket(object):
"""This class implements an raw socket abstraction."""
ETH_P_ALL = 0x0003 # Use for binding a RAW Socket to all protocols
SOCKET_TIMEOUT = 1
def __init__(self, iface_name):
"""Initialize an interface for use.
Args:
iface_name: 'eth0' interface name ('eth0, eth1,...')
"""
self._name = iface_name
self._socket = None
self._socket_timeout = raw_socket.SOCKET_TIMEOUT
socket.setdefaulttimeout(self._socket_timeout)
if self._name is None:
raise error.TestError('Invalid interface name')
def socket(self):
return self._socket
def socket_timeout(self):
"""Get the timeout use by recv_from"""
return self._socket_timeout
def set_socket_timeout(self, timeout):
"""Set the timeout use by recv_from.
Args:
timeout: time in seconds
"""
self._socket_timeout = timeout
def open(self, protocol=None):
"""Opens the raw socket to send and receive.
Args:
protocol : short in host byte order. None if ALL
"""
if self._socket is not None:
raise error.TestError('Raw socket already open')
if protocol is None:
self._socket = socket.socket(socket.PF_PACKET,
socket.SOCK_RAW)
self._socket.bind((self._name, self.ETH_P_ALL))
else:
self._socket = socket.socket(socket.PF_PACKET,
socket.SOCK_RAW,
socket.htons(protocol))
self._socket.bind((self._name, self.ETH_P_ALL))
self._socket.settimeout(1) # always running with 1 second timeout
def close(self):
""" Close the raw socket"""
if self._socket is not None:
self._socket.close()
self._socket = None
else:
raise error.TestError('Raw socket not open')
def recv(self, timeout):
"""Synchroneous receive.
Receives one packet from the interface and returns its content
in a string. Wait up to timeout for the packet if timeout is
not 0. This function filters out all the packets that are
less than the minimum ethernet packet size (60+crc).
Args:
timeout: max time in seconds to wait for the read to complete.
'0', wait for ever until a valid packet is received
Returns:
packet: None no packet was received
a binary string containing the received packet.
time_left: amount of time left in timeout
"""
if self._socket is None:
raise error.TestError('Raw socket not open')
time_left = timeout
packet = None
while time_left or (timeout == 0):
try:
packet = self._socket.recv(ethernet.ETH_PACKET_MAX_SIZE)
if len(packet) >= (ethernet.ETH_PACKET_MIN_SIZE-4):
break
packet = None
if timeout and time_left:
time_left -= raw_socket.SOCKET_TIMEOUT
except socket.timeout:
packet = None
if timeout and time_left:
time_left -= raw_socket.SOCKET_TIMEOUT
return packet, time_left
def send(self, packet):
"""Send an ethernet packet."""
if self._socket is None:
raise error.TestError('Raw socket not open')
self._socket.send(packet)
def send_to(self, dst_mac, src_mac, protocol, payload):
"""Send an ethernet frame.
Send an ethernet frame, formating the header.
Args:
dst_mac: 'byte string'
src_mac: 'byte string'
protocol: short in host byte order
payload: 'byte string'
"""
if self._socket is None:
raise error.TestError('Raw socket not open')
try:
packet = ethernet.pack(dst_mac, src_mac, protocol, payload)
except:
raise error.TestError('Invalid Packet')
self.send(packet)
def recv_from(self, dst_mac, src_mac, protocol):
"""Receive an ethernet frame that matches the dst, src and proto.
Filters all received packet to find a matching one, then unpack
it and present it to the caller as a frame.
Waits up to self._socket_timeout for a matching frame before
returning.
Args:
dst_mac: 'byte string'. None do not use in filter.
src_mac: 'byte string'. None do not use in filter.
protocol: short in host byte order. None do not use in filter.
Returns:
ethernet frame: { 'dst' : byte string,
'src' : byte string,
'proto' : short in host byte order,
'payload' : byte string
}
"""
start_time = time.clock()
timeout = self._socket_timeout
while 1:
frame = None
packet, timeout = self.recv(timeout)
if packet is not None:
frame = ethernet.unpack(packet)
if ((src_mac is None or frame['src'] == src_mac) and
(dst_mac is None or frame['dst'] == dst_mac) and
(protocol is None or frame['proto'] == protocol)):
break;
elif (timeout == 0 or
time.clock() - start_time > float(self._socket_timeout)):
frame = None
break
else:
if (timeout == 0 or
time.clock() - start_time > float(self._socket_timeout)):
frame = None
break
continue
return frame
class ethernet(object):
"""Provide ethernet packet manipulation methods."""
HDR_LEN = 14 # frame header length
CHECKSUM_LEN = 4 # frame checksum length
# Ethernet payload types - http://standards.ieee.org/regauth/ethertype
ETH_TYPE_IP = 0x0800 # IP protocol
ETH_TYPE_ARP = 0x0806 # address resolution protocol
ETH_TYPE_CDP = 0x2000 # Cisco Discovery Protocol
ETH_TYPE_8021Q = 0x8100 # IEEE 802.1Q VLAN tagging
ETH_TYPE_IP6 = 0x86DD # IPv6 protocol
ETH_TYPE_LOOPBACK = 0x9000 # used to test interfaces
ETH_TYPE_LLDP = 0x88CC # LLDP frame type
ETH_PACKET_MAX_SIZE = 1518 # maximum ethernet frane size
ETH_PACKET_MIN_SIZE = 64 # minimum ethernet frane size
ETH_LLDP_DST_MAC = '01:80:C2:00:00:0E' # LLDP destination mac
FRAME_KEY_DST_MAC = 'dst' # frame destination mac address
FRAME_KEY_SRC_MAC = 'src' # frame source mac address
FRAME_KEY_PROTO = 'proto' # frame protocol
FRAME_KEY_PAYLOAD = 'payload' # frame payload
def __init__(self):
pass;
@staticmethod
def mac_string_to_binary(hwaddr):
"""Converts a MAC address text string to byte string.
Converts a MAC text string from a text string 'aa:aa:aa:aa:aa:aa'
to a byte string 'xxxxxxxxxxxx'
Args:
hwaddr: a text string containing the MAC address to convert.
Returns:
A byte string.
"""
val = ''.join([chr(b) for b in [int(c, 16) \
for c in hwaddr.split(':',6)]])
return val
@staticmethod
def mac_binary_to_string(hwaddr):
"""Converts a MAC address byte string to text string.
Converts a MAC byte string 'xxxxxxxxxxxx' to a text string
'aa:aa:aa:aa:aa:aa'
Args:
hwaddr: a byte string containing the MAC address to convert.
Returns:
A text string.
"""
return "%02x:%02x:%02x:%02x:%02x:%02x" % tuple(map(ord,hwaddr))
@staticmethod
def pack(dst, src, protocol, payload):
"""Pack a frame in a byte string.
Args:
dst: destination mac in byte string format
src: src mac address in byte string format
protocol: short in network byte order
payload: byte string payload data
Returns:
An ethernet frame with header and payload in a byte string.
"""
# numbers are converted to network byte order (!)
frame = struct.pack("!6s6sH", dst, src, protocol) + payload
return frame
@staticmethod
def unpack(raw_frame):
"""Unpack a raw ethernet frame.
Returns:
None on error
{ 'dst' : byte string,
'src' : byte string,
'proto' : short in host byte order,
'payload' : byte string
}
"""
packet_len = len(raw_frame)
if packet_len < ethernet.HDR_LEN:
return None
payload_len = packet_len - ethernet.HDR_LEN
frame = {}
frame[ethernet.FRAME_KEY_DST_MAC], \
frame[ethernet.FRAME_KEY_SRC_MAC], \
frame[ethernet.FRAME_KEY_PROTO] = \
struct.unpack("!6s6sH", raw_frame[:ethernet.HDR_LEN])
frame[ethernet.FRAME_KEY_PAYLOAD] = \
raw_frame[ethernet.HDR_LEN:ethernet.HDR_LEN+payload_len]
return frame
def ethernet_packet():
try:
from autotest_lib.client.bin.net import site_net_utils
return site_net_utils.ethernet()
except:
return ethernet()