# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import math
import re
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
PLATFORM_LINUX = 'LINUX'
PLATFORM_MACOS = 'MAC_OS'
def _get_platform_delegate(platform):
if platform == PLATFORM_LINUX:
return LinuxPingDelegate
elif platform == PLATFORM_MACOS:
return MacPingDelegate
else:
raise error.TestError('%s is not a valid platform type', platform)
def _regex_int_from_string(pattern, line):
"""Retrieve an integer from a string, using regex.
@param pattern: The regular expression to apply to the input string.
@param line: String input to retrieve an integer from.
@return integer retrieved from the input string, or None if there is no
match.
"""
m = re.search(pattern, line)
if m is not None:
return int(m.group(1))
return None
def _regex_float_from_string(pattern, line):
"""Retrieve a float from a string, using regex.
@param pattern: The regular expression to apply to the input string.
@param line: String input to retrieve a float from.
@return float retrieved from the input string, or None if there is no
match.
"""
m = re.search(pattern, line)
if m is not None:
return float(m.group(1))
return None
class MacPingDelegate(object):
"""Implement ping functionality for MacOS hosts."""
@staticmethod
def ping_arguments(ping_config):
"""
@param ping_config PingConfig object describing the ping test for which
arguments are needed.
@return list of parameters to ping.
"""
args = []
args.append('-c %d' % ping_config.count)
if ping_config.size is not None:
args.append('-s %d' % ping_config.size)
if ping_config.interval is not None:
args.append('-i %f' % ping_config.interval)
if ping_config.qos is not None:
if ping_config.qos == 'be':
ping_config.append('-k 0')
elif ping_config.qos == 'bk':
ping_config.append('-k 1')
elif ping_config.qos == 'vi':
args.append('-k 2')
elif ping_config.qos == 'vo':
args.append('-k 3')
else:
raise error.TestFail('Unknown QoS value: %s' % ping_config.qos)
# The last argument is the IP address to ping.
args.append(ping_config.target_ip)
return args
@staticmethod
def parse_from_output(ping_output):
"""Extract the ping results from stdout.
@param ping_output string stdout from a ping command.
PING 8.8.8.8 (8.8.8.8): 56 data bytes
64 bytes from 8.8.8.8: icmp_seq=0 ttl=57 time=3.770 ms
64 bytes from 8.8.8.8: icmp_seq=1 ttl=57 time=4.165 ms
64 bytes from 8.8.8.8: icmp_seq=2 ttl=57 time=4.901 ms
--- 8.8.8.8 ping statistics ---
3 packets transmitted, 3 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 3.770/4.279/4.901/0.469 ms
"""
loss_line = (filter(lambda x: x.find('packets transmitted') > 0,
ping_output.splitlines()) or [''])[0]
sent = _regex_int_from_string('([0-9]+) packets transmitted', loss_line)
received = _regex_int_from_string('([0-9]+) packets received',
loss_line)
loss = _regex_float_from_string('([0-9]+\.[0-9]+)% packet loss',
loss_line)
if None in (sent, received, loss):
raise error.TestFail('Failed to parse transmission statistics.')
m = re.search('round-trip min\/avg\/max\/stddev = ([0-9.]+)\/([0-9.]+)'
'\/([0-9.]+)\/([0-9.]+) ms', ping_output)
if m is not None:
return PingResult(sent, received, loss,
min_latency=float(m.group(1)),
avg_latency=float(m.group(2)),
max_latency=float(m.group(3)),
dev_latency=float(m.group(4)))
if received > 0:
raise error.TestFail('Failed to parse latency statistics.')
return PingResult(sent, received, loss)
class LinuxPingDelegate(object):
"""Implement ping functionality specific to the linux platform."""
@staticmethod
def ping_arguments(ping_config):
"""
@param ping_config PingConfig object describing the ping test for which
arguments are needed.
@return list of parameters to ping.
"""
args = []
args.append('-c %d' % ping_config.count)
if ping_config.size is not None:
args.append('-s %d' % ping_config.size)
if ping_config.interval is not None:
args.append('-i %f' % ping_config.interval)
if ping_config.qos is not None:
if ping_config.qos == 'be':
args.append('-Q 0x04')
elif ping_config.qos == 'bk':
args.append('-Q 0x02')
elif ping_config.qos == 'vi':
args.append('-Q 0x08')
elif ping_config.qos == 'vo':
args.append('-Q 0x10')
else:
raise error.TestFail('Unknown QoS value: %s' % ping_config.qos)
# The last argument is the IP address to ping.
args.append(ping_config.target_ip)
return args
@staticmethod
def parse_from_output(ping_output):
"""Extract the ping results from stdout.
@param ping_output string stdout from a ping command.
On error, some statistics may be missing entirely from the output.
An example of output with some errors is:
PING 192.168.0.254 (192.168.0.254) 56(84) bytes of data.
From 192.168.0.124 icmp_seq=1 Destination Host Unreachable
From 192.168.0.124 icmp_seq=2 Destination Host Unreachable
From 192.168.0.124 icmp_seq=3 Destination Host Unreachable
64 bytes from 192.168.0.254: icmp_req=4 ttl=64 time=1171 ms
[...]
64 bytes from 192.168.0.254: icmp_req=10 ttl=64 time=1.95 ms
--- 192.168.0.254 ping statistics ---
10 packets transmitted, 7 received, +3 errors, 30% packet loss,
time 9007ms
rtt min/avg/max/mdev = 1.806/193.625/1171.174/403.380 ms, pipe 3
A more normal run looks like:
PING google.com (74.125.239.137) 56(84) bytes of data.
64 bytes from 74.125.239.137: icmp_req=1 ttl=57 time=1.77 ms
64 bytes from 74.125.239.137: icmp_req=2 ttl=57 time=1.78 ms
[...]
64 bytes from 74.125.239.137: icmp_req=5 ttl=57 time=1.79 ms
--- google.com ping statistics ---
5 packets transmitted, 5 received, 0% packet loss, time 4007ms
rtt min/avg/max/mdev = 1.740/1.771/1.799/0.042 ms
We also sometimes see result lines like:
9 packets transmitted, 9 received, +1 duplicates, 0% packet loss,
time 90 ms
"""
loss_line = (filter(lambda x: x.find('packets transmitted') > 0,
ping_output.splitlines()) or [''])[0]
sent = _regex_int_from_string('([0-9]+) packets transmitted', loss_line)
received = _regex_int_from_string('([0-9]+) received', loss_line)
loss = _regex_int_from_string('([0-9]+)% packet loss', loss_line)
if None in (sent, received, loss):
raise error.TestFail('Failed to parse transmission statistics.')
m = re.search('(round-trip|rtt) min[^=]*= '
'([0-9.]+)/([0-9.]+)/([0-9.]+)/([0-9.]+)', ping_output)
if m is not None:
return PingResult(sent, received, loss,
min_latency=float(m.group(2)),
avg_latency=float(m.group(3)),
max_latency=float(m.group(4)),
dev_latency=float(m.group(5)))
if received > 0:
raise error.TestFail('Failed to parse latency statistics.')
return PingResult(sent, received, loss)
class PingConfig(object):
"""Describes the parameters for a ping command."""
DEFAULT_COUNT = 10
PACKET_WAIT_MARGIN_SECONDS = 120
def __init__(self, target_ip, count=DEFAULT_COUNT, size=None,
interval=None, qos=None,
ignore_status=False, ignore_result=False):
super(PingConfig, self).__init__()
self.target_ip = target_ip
self.count = count
self.size = size
self.interval = interval
if qos:
qos = qos.lower()
self.qos = qos
self.ignore_status = ignore_status
self.ignore_result = ignore_result
interval_seconds = self.interval or 1
command_time = math.ceil(interval_seconds * self.count)
self.command_timeout_seconds = int(command_time +
self.PACKET_WAIT_MARGIN_SECONDS)
class PingResult(object):
"""Represents a parsed ping command result."""
def __init__(self, sent, received, loss,
min_latency=-1.0, avg_latency=-1.0,
max_latency=-1.0, dev_latency=-1.0):
"""Construct a PingResult.
@param sent: int number of packets sent.
@param received: int number of replies received.
@param loss: int loss as a percentage (0-100)
@param min_latency: float min response latency in ms.
@param avg_latency: float average response latency in ms.
@param max_latency: float max response latency in ms.
@param dev_latency: float response latency deviation in ms.
"""
super(PingResult, self).__init__()
self.sent = sent
self.received = received
self.loss = loss
self.min_latency = min_latency
self.avg_latency = avg_latency
self.max_latency = max_latency
self.dev_latency = dev_latency
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__,
', '.join(['%s=%r' % item
for item in vars(self).iteritems()]))
class PingRunner(object):
"""Delegate to run the ping command on a local or remote host."""
DEFAULT_PING_COMMAND = 'ping'
PING_LOSS_THRESHOLD = 20 # A percentage.
def __init__(self, command_ping=DEFAULT_PING_COMMAND, host=None,
platform=PLATFORM_LINUX):
"""Construct a PingRunner.
@param command_ping optional path or alias of the ping command.
@param host optional host object when a remote host is desired.
"""
super(PingRunner, self).__init__()
self._run = utils.run
if host is not None:
self._run = host.run
self.command_ping = command_ping
self._platform_delegate = _get_platform_delegate(platform)
def simple_ping(self, host_name):
"""Quickly test that a hostname or IPv4 address responds to ping.
@param host_name: string name or IPv4 address.
@return True if host_name responds to at least one ping.
"""
ping_config = PingConfig(host_name, count=3, interval=0.5,
ignore_status=True, ignore_result=True)
ping_result = self.ping(ping_config)
if ping_result is None or ping_result.received == 0:
return False
return True
def ping(self, ping_config):
"""Run ping with the given |ping_config|.
Will assert that the ping had reasonable levels of loss unless
requested not to in |ping_config|.
@param ping_config PingConfig object describing the ping to run.
"""
command_pieces = ([self.command_ping] +
self._platform_delegate.ping_arguments(ping_config))
command = ' '.join(command_pieces)
command_result = self._run(command,
timeout=ping_config.command_timeout_seconds,
ignore_status=True,
ignore_timeout=True)
if not command_result:
if ping_config.ignore_status:
logging.warning('Ping command timed out; cannot parse output.')
return PingResult(ping_config.count, 0, 100)
raise error.TestFail('Ping command timed out unexpectedly.')
if not command_result.stdout:
logging.warning('Ping command returned no output; stderr was %s.',
command_result.stderr)
if ping_config.ignore_result:
return PingResult(ping_config.count, 0, 100)
raise error.TestFail('Ping command failed to yield any output')
if command_result.exit_status and not ping_config.ignore_status:
raise error.TestFail('Ping command failed with code=%d' %
command_result.exit_status)
ping_result = self._platform_delegate.parse_from_output(
command_result.stdout)
if ping_config.ignore_result:
return ping_result
if ping_result.loss > self.PING_LOSS_THRESHOLD:
raise error.TestFail('Lost ping packets: %r.' % ping_result)
logging.info('Ping successful.')
return ping_result