普通文本  |  1050行  |  39.5 KB

#!/usr/bin/python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


import logging
import numpy
import os
import re
import tempfile
import threading
import time

from glob import glob
from autotest_lib.client.bin import test, utils
from autotest_lib.client.bin.input.input_device import *
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.audio import alsa_utils
from autotest_lib.client.cros.audio import audio_data
from autotest_lib.client.cros.audio import cmd_utils
from autotest_lib.client.cros.audio import cras_utils
from autotest_lib.client.cros.audio import sox_utils

LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'

_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'

_DEFAULT_NUM_CHANNELS = 2
_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
_DEFAULT_PLAYBACK_VOLUME = 100
_DEFAULT_CAPTURE_GAIN = 2500
_DEFAULT_ALSA_MAX_VOLUME = '100%'
_DEFAULT_ALSA_CAPTURE_GAIN = '25dB'

# Minimum RMS value to pass when checking recorded file.
_DEFAULT_SOX_RMS_THRESHOLD = 0.08

_JACK_VALUE_ON_RE = re.compile('.*values=on')
_HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack')
_MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack')

_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
_SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)')

_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'

# Tools from platform/audiotest
AUDIOFUNTEST_PATH = 'audiofuntest'
AUDIOLOOP_PATH = 'looptest'
LOOPBACK_LATENCY_PATH = 'loopback_latency'
SOX_PATH = 'sox'
TEST_TONES_PATH = 'test_tones'

_MINIMUM_NORM = 0.001
_CORRELATION_INDEX_THRESHOLD = 0.999
# The minimum difference of estimated frequencies between two sine waves.
_FREQUENCY_DIFF_THRESHOLD = 20
# The minimum RMS value of meaningful audio data.
_MEANINGFUL_RMS_THRESHOLD = 0.001

def set_mixer_controls(mixer_settings={}, card='0'):
    """Sets all mixer controls listed in the mixer settings on card.

    @param mixer_settings: Mixer settings to set.
    @param card: Index of audio card to set mixer settings for.
    """
    logging.info('Setting mixer control values on %s', card)
    for item in mixer_settings:
        logging.info('Setting %s to %s on card %s',
                     item['name'], item['value'], card)
        cmd = 'amixer -c %s cset name=%s %s'
        cmd = cmd % (card, item['name'], item['value'])
        try:
            utils.system(cmd)
        except error.CmdError:
            # A card is allowed not to support all the controls, so don't
            # fail the test here if we get an error.
            logging.info('amixer command failed: %s', cmd)

def set_volume_levels(volume, capture):
    """Sets the volume and capture gain through cras_test_client.

    @param volume: The playback volume to set.
    @param capture: The capture gain to set.
    """
    logging.info('Setting volume level to %d', volume)
    utils.system('/usr/bin/cras_test_client --volume %d' % volume)
    logging.info('Setting capture gain to %d', capture)
    utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
    utils.system('/usr/bin/cras_test_client --dump_server_info')
    utils.system('/usr/bin/cras_test_client --mute 0')
    utils.system('amixer -c 0 contents')

def loopback_latency_check(**args):
    """Checks loopback latency.

    @param args: additional arguments for loopback_latency.

    @return A tuple containing measured and reported latency in uS.
        Return None if no audio detected.
    """
    noise_threshold = str(args['n']) if args.has_key('n') else '400'

    cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold)

    output = utils.system_output(cmd, retain_output=True)

    # Sleep for a short while to make sure device is not busy anymore
    # after called loopback_latency.
    time.sleep(.1)

    measured_latency = None
    reported_latency = None
    for line in output.split('\n'):
        match = re.search(_MEASURED_LATENCY_RE, line, re.I)
        if match:
            measured_latency = int(match.group(1))
            continue
        match = re.search(_REPORTED_LATENCY_RE, line, re.I)
        if match:
            reported_latency = int(match.group(1))
            continue
        if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
            return None
    if measured_latency and reported_latency:
        return (measured_latency, reported_latency)
    else:
        # Should not reach here, just in case.
        return None

def get_mixer_jack_status(jack_reg_exp):
    """Gets the mixer jack status.

    @param jack_reg_exp: The regular expression to match jack control name.

    @return None if the control does not exist, return True if jack control
        is detected plugged, return False otherwise.
    """
    output = utils.system_output('amixer -c0 controls', retain_output=True)
    numid = None
    for line in output.split('\n'):
        m = jack_reg_exp.match(line)
        if m:
            numid = m.group(1)
            break

    # Proceed only when matched numid is not empty.
    if numid:
        output = utils.system_output('amixer -c0 cget numid=%s' % numid)
        for line in output.split('\n'):
            if _JACK_VALUE_ON_RE.match(line):
                return True
        return False
    else:
        return None

def get_hp_jack_status():
    """Gets the status of headphone jack."""
    status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
    if status is not None:
        return status

    # When headphone jack is not found in amixer, lookup input devices
    # instead.
    #
    # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
    # possibly replace the existing check using amixer.
    for evdev in glob('/dev/input/event*'):
        device = InputDevice(evdev)
        if device.is_hp_jack():
            return device.get_headphone_insert()
    else:
        return None

def get_mic_jack_status():
    """Gets the status of mic jack."""
    status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
    if status is not None:
        return status

    # When mic jack is not found in amixer, lookup input devices instead.
    for evdev in glob('/dev/input/event*'):
        device = InputDevice(evdev)
        if device.is_mic_jack():
            return device.get_microphone_insert()
    else:
        return None

def log_loopback_dongle_status():
    """Log the status of the loopback dongle to make sure it is equipped."""
    dongle_status_ok = True

    # Check Mic Jack
    mic_jack_status = get_mic_jack_status()
    logging.info('Mic jack status: %s', mic_jack_status)
    dongle_status_ok &= bool(mic_jack_status)

    # Check Headphone Jack
    hp_jack_status = get_hp_jack_status()
    logging.info('Headphone jack status: %s', hp_jack_status)
    dongle_status_ok &= bool(hp_jack_status)

    # Use latency check to test if audio can be captured through dongle.
    # We only want to know the basic function of dongle, so no need to
    # assert the latency accuracy here.
    latency = loopback_latency_check(n=4000)
    if latency:
        logging.info('Got latency measured %d, reported %d',
                latency[0], latency[1])
    else:
        logging.info('Latency check fail.')
        dongle_status_ok = False

    logging.info('audio loopback dongle test: %s',
            'PASS' if dongle_status_ok else 'FAIL')

# Functions to test audio palyback.
def play_sound(duration_seconds=None, audio_file_path=None):
    """Plays a sound file found at |audio_file_path| for |duration_seconds|.

    If |audio_file_path|=None, plays a default audio file.
    If |duration_seconds|=None, plays audio file in its entirety.

    @param duration_seconds: Duration to play sound.
    @param audio_file_path: Path to the audio file.
    """
    if not audio_file_path:
        audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
    duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
    utils.system('aplay %s %s' % (duration_arg, audio_file_path))

def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
                       sample_size=16):
    """Gets the command args to generate a sine wav to play to odev.

    @param channel: 0 for left, 1 for right; otherwize, mono.
    @param odev: alsa output device.
    @param freq: frequency of the generated sine tone.
    @param duration: duration of the generated sine tone.
    @param sample_size: output audio sample size. Default to 16.
    """
    cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
               odev, 'synth', str(duration)]
    if channel == 0:
        cmdargs += ['sine', str(freq), 'sine', '0']
    elif channel == 1:
        cmdargs += ['sine', '0', 'sine', str(freq)]
    else:
        cmdargs += ['sine', str(freq)]

    return cmdargs

def play_sine(channel, odev='default', freq=1000, duration=10,
              sample_size=16):
    """Generates a sine wave and plays to odev.

    @param channel: 0 for left, 1 for right; otherwize, mono.
    @param odev: alsa output device.
    @param freq: frequency of the generated sine tone.
    @param duration: duration of the generated sine tone.
    @param sample_size: output audio sample size. Default to 16.
    """
    cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
    utils.system(' '.join(cmdargs))

# Functions to compose customized sox command, execute it and process the
# output of sox command.
def get_sox_mixer_cmd(infile, channel,
                      num_channels=_DEFAULT_NUM_CHANNELS,
                      sox_format=_DEFAULT_SOX_FORMAT):
    """Gets sox mixer command to reduce channel.

    @param infile: Input file name.
    @param channel: The selected channel to take effect.
    @param num_channels: The number of total channels to test.
    @param sox_format: Format to generate sox command.
    """
    # Build up a pan value string for the sox command.
    if channel == 0:
        pan_values = '1'
    else:
        pan_values = '0'
    for pan_index in range(1, num_channels):
        if channel == pan_index:
            pan_values = '%s%s' % (pan_values, ',1')
        else:
            pan_values = '%s%s' % (pan_values, ',0')

    return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH,
            sox_format, infile, sox_format, pan_values)

def sox_stat_output(infile, channel,
                    num_channels=_DEFAULT_NUM_CHANNELS,
                    sox_format=_DEFAULT_SOX_FORMAT):
    """Executes sox stat command.

    @param infile: Input file name.
    @param channel: The selected channel.
    @param num_channels: The number of total channels to test.
    @param sox_format: Format to generate sox command.

    @return The output of sox stat command
    """
    sox_mixer_cmd = get_sox_mixer_cmd(infile, channel,
                                      num_channels, sox_format)
    stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format)
    sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
    return utils.system_output(sox_cmd, retain_output=True)

def get_audio_rms(sox_output):
    """Gets the audio RMS value from sox stat output

    @param sox_output: Output of sox stat command.

    @return The RMS value parsed from sox stat output.
    """
    for rms_line in sox_output.split('\n'):
        m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
        if m is not None:
            return float(m.group(1))

def get_rough_freq(sox_output):
    """Gets the rough audio frequency from sox stat output

    @param sox_output: Output of sox stat command.

    @return The rough frequency value parsed from sox stat output.
    """
    for rms_line in sox_output.split('\n'):
        m = _SOX_ROUGH_FREQ_RE.match(rms_line)
        if m is not None:
            return int(m.group(1))

def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
    """Checks if the calculated RMS value is expected.

    @param sox_output: The output from sox stat command.
    @param sox_threshold: The threshold to test RMS value against.

    @raises error.TestError if RMS amplitude can't be parsed.
    @raises error.TestFail if the RMS amplitude of the recording isn't above
            the threshold.
    """
    rms_val = get_audio_rms(sox_output)

    # In case we don't get a valid RMS value.
    if rms_val is None:
        raise error.TestError(
            'Failed to generate an audio RMS value from playback.')

    logging.info('Got audio RMS value of %f. Minimum pass is %f.',
                 rms_val, sox_threshold)
    if rms_val < sox_threshold:
        raise error.TestFail(
            'Audio RMS value %f too low. Minimum pass is %f.' %
            (rms_val, sox_threshold))

def noise_reduce_file(in_file, noise_file, out_file,
                      sox_format=_DEFAULT_SOX_FORMAT):
    """Runs the sox command to reduce noise.

    Runs the sox command to noise-reduce in_file using the noise
    profile from noise_file.

    @param in_file: The file to noise reduce.
    @param noise_file: The file containing the noise profile.
        This can be created by recording silence.
    @param out_file: The file contains the noise reduced sound.
    @param sox_format: The  sox format to generate sox command.
    """
    prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
               sox_format, noise_file)
    reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
            (SOX_PATH, sox_format, in_file, sox_format, out_file))
    utils.system('%s | %s' % (prof_cmd, reduce_cmd))

def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
    """Records a sample from the default input device.

    @param tmpfile: The file to record to.
    @param record_command: The command to record audio.
    """
    utils.system('%s %s' % (record_command, tmpfile))

def create_wav_file(wav_dir, prefix=""):
    """Creates a unique name for wav file.

    The created file name will be preserved in autotest result directory
    for future analysis.

    @param wav_dir: The directory of created wav file.
    @param prefix: specified file name prefix.
    """
    filename = "%s-%s.wav" % (prefix, time.time())
    return os.path.join(wav_dir, filename)

def run_in_parallel(*funs):
    """Runs methods in parallel.

    @param funs: methods to run.
    """
    threads = []
    for f in funs:
        t = threading.Thread(target=f)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

def loopback_test_channels(noise_file_name, wav_dir,
                           playback_callback=None,
                           check_recorded_callback=check_audio_rms,
                           preserve_test_file=True,
                           num_channels = _DEFAULT_NUM_CHANNELS,
                           record_callback=record_sample,
                           mix_callback=None):
    """Tests loopback on all channels.

    @param noise_file_name: Name of the file contains pre-recorded noise.
    @param wav_dir: The directory of created wav file.
    @param playback_callback: The callback to do the playback for
        one channel.
    @param record_callback: The callback to do the recording.
    @param check_recorded_callback: The callback to check recorded file.
    @param preserve_test_file: Retain the recorded files for future debugging.
    @param num_channels: The number of total channels to test.
    @param mix_callback: The callback to do on the one-channel file.
    """
    for channel in xrange(num_channels):
        record_file_name = create_wav_file(wav_dir,
                                           "record-%d" % channel)
        functions = [lambda: record_callback(record_file_name)]

        if playback_callback:
            functions.append(lambda: playback_callback(channel))

        if mix_callback:
            mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel)
            functions.append(lambda: mix_callback(mix_file_name))

        run_in_parallel(*functions)

        if mix_callback:
            sox_output_mix = sox_stat_output(mix_file_name, channel)
            rms_val_mix = get_audio_rms(sox_output_mix)
            logging.info('Got mixed audio RMS value of %f.', rms_val_mix)

        sox_output_record = sox_stat_output(record_file_name, channel)
        rms_val_record = get_audio_rms(sox_output_record)
        logging.info('Got recorded audio RMS value of %f.', rms_val_record)

        reduced_file_name = create_wav_file(wav_dir,
                                            "reduced-%d" % channel)
        noise_reduce_file(record_file_name, noise_file_name,
                          reduced_file_name)

        sox_output_reduced = sox_stat_output(reduced_file_name, channel)

        if not preserve_test_file:
            os.unlink(reduced_file_name)
            os.unlink(record_file_name)
            if mix_callback:
                os.unlink(mix_file_name)

        check_recorded_callback(sox_output_reduced)


def get_channel_sox_stat(
        input_audio, channel_index, channels=2, bits=16, rate=48000):
    """Gets the sox stat info of the selected channel in the input audio file.

    @param input_audio: The input audio file to be analyzed.
    @param channel_index: The index of the channel to be analyzed.
                          (1 for the first channel).
    @param channels: The number of channels in the input audio.
    @param bits: The number of bits of each audio sample.
    @param rate: The sampling rate.
    """
    if channel_index <= 0 or channel_index > channels:
        raise ValueError('incorrect channel_indexi: %d' % channel_index)

    if channels == 1:
        return sox_utils.get_stat(
                input_audio, channels=channels, bits=bits, rate=rate)

    p1 = cmd_utils.popen(
            sox_utils.extract_channel_cmd(
                    input_audio, '-', channel_index,
                    channels=channels, bits=bits, rate=rate),
            stdout=cmd_utils.PIPE)
    p2 = cmd_utils.popen(
            sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
            stdin=p1.stdout, stderr=cmd_utils.PIPE)
    stat_output = p2.stderr.read()
    cmd_utils.wait_and_check_returncode(p1, p2)
    return sox_utils.parse_stat_output(stat_output)


def get_rms(input_audio, channels=1, bits=16, rate=48000):
    """Gets the RMS values of all channels of the input audio.

    @param input_audio: The input audio file to be checked.
    @param channels: The number of channels in the input audio.
    @param bits: The number of bits of each audio sample.
    @param rate: The sampling rate.
    """
    stats = [get_channel_sox_stat(
            input_audio, i + 1, channels=channels, bits=bits,
            rate=rate) for i in xrange(channels)]

    logging.info('sox stat: %s', [str(s) for s in stats])
    return [s.rms for s in stats]


def reduce_noise_and_get_rms(
        input_audio, noise_file, channels=1, bits=16, rate=48000):
    """Reduces noise in the input audio by the given noise file and then gets
    the RMS values of all channels of the input audio.

    @param input_audio: The input audio file to be analyzed.
    @param noise_file: The noise file used to reduce noise in the input audio.
    @param channels: The number of channels in the input audio.
    @param bits: The number of bits of each audio sample.
    @param rate: The sampling rate.
    """
    with tempfile.NamedTemporaryFile() as reduced_file:
        p1 = cmd_utils.popen(
                sox_utils.noise_profile_cmd(
                        noise_file, '-', channels=channels, bits=bits,
                        rate=rate),
                stdout=cmd_utils.PIPE)
        p2 = cmd_utils.popen(
                sox_utils.noise_reduce_cmd(
                        input_audio, reduced_file.name, '-',
                        channels=channels, bits=bits, rate=rate),
                stdin=p1.stdout)
        cmd_utils.wait_and_check_returncode(p1, p2)
        return get_rms(reduced_file.name, channels, bits, rate)


def skip_devices_to_test(*boards):
    """Devices to skip due to hardware or test compatibility issues.

    @param boards: the boards to skip testing.
    """
    # TODO(scottz): Remove this when crbug.com/220147 is fixed.
    dut_board = utils.get_current_board()
    if dut_board in boards:
       raise error.TestNAError('This test is not available on %s' % dut_board)


def cras_rms_test_setup():
    """Setups for the cras_rms_tests.

    To make sure the line_out-to-mic_in path is all green.
    """
    # TODO(owenlin): Now, the nodes are choosed by chrome.
    #                We should do it here.
    cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
    cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)

    cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN)

    cras_utils.set_system_mute(False)
    cras_utils.set_capture_mute(False)


def generate_rms_postmortem():
    """Generates postmortem for rms tests."""
    try:
        logging.info('audio postmortem report')
        log_loopback_dongle_status()
        logging.info(get_audio_diagnostics())
    except Exception:
        logging.exception('Error while generating postmortem report')


def get_audio_diagnostics():
    """Gets audio diagnostic results.

    @returns: a string containing diagnostic results.

    """
    return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE)


def get_max_cross_correlation(signal_a, signal_b):
    """Gets max cross-correlation and best time delay of two signals.

    Computes cross-correlation function between two
    signals and gets the maximum value and time delay.
    The steps includes:
      1. Compute cross-correlation function of X and Y and get Cxy.
         The correlation function Cxy is an array where Cxy[k] is the
         cross product of X and Y when Y is delayed by k.
         Refer to manual of numpy.correlate for detail of correlation.
      2. Find the maximum value C_max and index C_index in Cxy.
      3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
      4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.

    Max cross-correlation indicates the similarity of X and Y. The value
    is 1 if X equals Y multiplied by a positive scalar.
    The value is -1 if X equals Y multiplied by a negative scaler.
    Any constant level shift will be regarded as distortion and will make
    max cross-correlation value deviated from 1.
    C_index is the best time delay of Y that make Y looks similar to X.
    Refer to http://en.wikipedia.org/wiki/Cross-correlation.

    @param signal_a: A list of numbers which contains the first signal.
    @param signal_b: A list of numbers which contains the second signal.

    @raises: ValueError if any number in signal_a or signal_b is not a float.
             ValueError if norm of any array is less than _MINIMUM_NORM.

    @returns: A tuple (correlation index, best delay). If there are more than
              one best delay, just return the first one.
    """
    def check_list_contains_float(numbers):
        """Checks the elements in a list are all float.

        @param numbers: A list of numbers.

        @raises: ValueError if there is any element which is not a float
                 in the list.
        """
        if any(not isinstance(x, float) for x in numbers):
            raise ValueError('List contains number which is not a float')

    check_list_contains_float(signal_a)
    check_list_contains_float(signal_b)

    norm_a = numpy.linalg.norm(signal_a)
    norm_b = numpy.linalg.norm(signal_b)
    logging.debug('norm_a: %f', norm_a)
    logging.debug('norm_b: %f', norm_b)
    if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
        raise ValueError('No meaningful data as norm is too small.')

    correlation = numpy.correlate(signal_a, signal_b, 'full')
    max_correlation = max(correlation)
    best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
    if len(best_delays) > 1:
        logging.warning('There are more than one best delay: %r', best_delays)
    return max_correlation / (norm_a * norm_b), best_delays[0]


def trim_data(data, threshold=0):
    """Trims a data by removing value that is too small in head and tail.

    Removes elements in head and tail whose absolute value is smaller than
    or equal to threshold.
    E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
    ([0.2, 0.3, 0.2], 2)

    @param data: A list of numbers.
    @param threshold: The threshold to compare against.

    @returns: A tuple (trimmed_data, end_trimmed_length), where
              end_trimmed_length is the length of original data being trimmed
              from the end.
              Returns ([], None) if there is no valid data.
    """
    indice_valid = [
            i for i, j in enumerate(data) if abs(j) > threshold]
    if not indice_valid:
        logging.warning(
                'There is no element with absolute value greater '
                'than threshold %f', threshold)
        return [], None
    logging.debug('Start and end of indice_valid: %d, %d',
                  indice_valid[0], indice_valid[-1])
    end_trimmed_length = len(data) - indice_valid[-1] - 1
    logging.debug('Trimmed length in the end: %d', end_trimmed_length)
    return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)


def get_one_channel_correlation(test_data, golden_data):
    """Gets max cross-correlation of test_data and golden_data.

    Trims test data and compute the max cross-correlation against golden_data.
    Signal can be trimmed because those zero values in the head and tail of
    a signal will not affect correlation computation.

    @param test_data: A list containing the data to compare against golden data.
    @param golden_data: A list containing the golden data.

    @returns: A tuple (max cross-correlation, best_delay) if data is valid.
              Otherwise returns (None, None). Refer to docstring of
              get_max_cross_correlation.
    """
    trimmed_test_data, end_trimmed_length = trim_data(test_data)

    def to_float(samples):
      """Casts elements in the list to float.

      @param samples: A list of numbers.

      @returns: A list of original numbers casted to float.
      """
      samples_float = [float(x) for x in samples]
      return samples_float

    max_cross_correlation, best_delay =  get_max_cross_correlation(
            to_float(golden_data),
            to_float(trimmed_test_data))

    # The reason to add back the trimmed length in the end.
    # E.g.:
    # golden data:
    #
    # |-----------vvvv----------------|  vvvv is the signal of interest.
    #       a                 b
    #
    # test data:
    #
    # |---x----vvvv--------x----------------|  x is the place to trim.
    #   c   d         e            f
    #
    # trimmed test data:
    #
    # |----vvvv--------|
    #   d         e
    #
    # The first output of cross correlation computation :
    #
    #                  |-----------vvvv----------------|
    #                       a                 b
    #
    # |----vvvv--------|
    #   d         e
    #
    # The largest output of cross correlation computation happens at
    # delay a + e.
    #
    #                  |-----------vvvv----------------|
    #                       a                 b
    #
    #                         |----vvvv--------|
    #                           d         e
    #
    # Cross correlation starts computing by aligning the last sample
    # of the trimmed test data to the first sample of golden data.
    # The best delay calculated from trimmed test data and golden data
    # cross correlation is e + a. But the real best delay that should be
    # identical on two channel should be e + a + f.
    # So we need to add back the length being trimmed in the end.

    if max_cross_correlation:
        return max_cross_correlation, best_delay + end_trimmed_length
    else:
        return None, None


def compare_one_channel_correlation(test_data, golden_data, parameters):
    """Compares two one-channel data by correlation.

    @param test_data: A list containing the data to compare against golden data.
    @param golden_data: A list containing the golden data.
    @param parameters: A dict containing parameters for method.

    @returns: A dict containing:
              index: The index of similarity where 1 means they are different
                  only by a positive scale.
              best_delay: The best delay of test data in relative to golden
                  data.
              equal: A bool containing comparing result.
    """
    if 'correlation_threshold' in parameters:
        threshold = parameters['correlation_threshold']
    else:
        threshold = _CORRELATION_INDEX_THRESHOLD

    result_dict = dict()
    max_cross_correlation, best_delay = get_one_channel_correlation(
            test_data, golden_data)
    result_dict['index'] = max_cross_correlation
    result_dict['best_delay'] = best_delay
    result_dict['equal'] = True if (
        max_cross_correlation and
        max_cross_correlation > threshold) else False
    logging.debug('result_dict: %r', result_dict)
    return result_dict


def get_one_channel_stat(data, data_format):
    """Gets statistic information of data.

    @param data: A list containing one channel data.
    @param data_format: A dict containing data format of data.

    @return: The sox stat parsed result. An object containing
             sameple_count: An int. Samples read.
             length: A float. Length in seconds.
             rms: A float. RMS amplitude.
             rough_frequency: A float. Rough frequency.
    """
    if not data:
        raise ValueError('Data is empty. Can not get stat')
    raw_data = audio_data.AudioRawData(
            binary=None, channel=1,
            sample_format=data_format['sample_format'])
    raw_data.copy_channel_data([data])
    with tempfile.NamedTemporaryFile() as raw_data_file:
        raw_data_path = raw_data_file.name
        raw_data.write_to_file(raw_data_path)

        bits = 8 * (audio_data.SAMPLE_FORMATS[
                    data_format['sample_format']]['size_bytes'])
        stat = sox_utils.get_stat(raw_data_path, channels=1, bits=bits,
                                  rate=data_format['rate'])
        return stat


def compare_one_channel_frequency(test_data, test_data_format,
                                  golden_data, golden_data_format):
    """Compares two one-channel data by frequency.

    @param test_data: A list containing the data to compare against golden data.
    @param test_data_format: A dict containing data format of test data.
    @param golden_data: A list containing the golden data.
    @param golden_data_format: A dict containing data format of golden data.

    @returns: A dict containing:
              test_data_frequency: test data frequency.
              golden_data_frequency: golden data frequency.
              equal: A bool containing comparing result.

    @raises: ValueError if the test data RMS is too small to be meaningful.

    """
    result_dict = dict()
    golden_data_stat = get_one_channel_stat(golden_data, golden_data_format)
    logging.info('Get golden data one channel stat: %s', golden_data_stat)
    test_data_stat = get_one_channel_stat(test_data, test_data_format)
    logging.info('Get test data one channel stat: %s', test_data_stat)

    result_dict['golden_data_frequency'] = golden_data_stat.rough_frequency
    result_dict['test_data_frequency'] = test_data_stat.rough_frequency
    result_dict['equal'] = True if (
            abs(result_dict['test_data_frequency'] -
                result_dict['golden_data_frequency']) < _FREQUENCY_DIFF_THRESHOLD
            ) else False
    logging.debug('result_dict: %r', result_dict)
    if test_data_stat.rms < _MEANINGFUL_RMS_THRESHOLD:
        raise ValueError('Recorded RMS %f is too small to be meaningful.',
                         test_data_stat.rms)
    return result_dict


def compare_one_channel_data(test_data, test_data_format,
                             golden_data, golden_data_format, method,
                             parameters):
    """Compares two one-channel data.

    @param test_data: A list containing the data to compare against golden data.
    @param test_data_format: The data format of test data.
    @param golden_data: A list containing the golden data.
    @param golden_data_format: The data format of golden data.
    @param method: The comparing method. Currently only 'correlation' is
                   supported.
    @param parameters: A dict containing parameters for method.

    @returns: A dict containing:
              index: The index of similarity where 1 means they are different
                  only by a positive scale.
              best_delay: The best delay of test data in relative to golden
                  data.
              equal: A bool containing comparing result.

    @raises: NotImplementedError if method is not supported.
    """
    if method == 'correlation':
        return compare_one_channel_correlation(test_data, golden_data,
                parameters)
    if method == 'frequency':
        return compare_one_channel_frequency(
                test_data, test_data_format, golden_data, golden_data_format)
    raise NotImplementedError('method %s is not implemented' % method)


def compare_data(golden_data_binary, golden_data_format,
                 test_data_binary, test_data_format,
                 channel_map, method, parameters=None):
    """Compares two raw data.

    @param golden_data_binary: The binary containing golden data.
    @param golden_data_format: The data format of golden data.
    @param test_data_binary: The binary containing test data.
    @param test_data_format: The data format of test data.
    @param channel_map: A list containing channel mapping.
                        E.g. [1, 0, None, None, None, None, None, None] means
                        channel 0 of test data should map to channel 1 of
                        golden data. Channel 1 of test data should map to
                        channel 0 of golden data. Channel 2 to 7 of test data
                        should be skipped.
    @param method: The method to compare data. Use 'correlation' to compare
                   general data. Use 'frequency' to compare data containing
                   sine wave.

    @param parameters: A dict containing parameters for method, if needed.

    @returns: A boolean for compare result.

    @raises: NotImplementedError if file type is not raw.
             NotImplementedError if sampling rates of two data are not the same.
    """
    if parameters is None:
        parameters = dict()

    if (golden_data_format['file_type'] != 'raw' or
        test_data_format['file_type'] != 'raw'):
        raise NotImplementedError('Only support raw data in compare_data.')
    if (golden_data_format['rate'] != test_data_format['rate']):
        raise NotImplementedError(
                'Only support comparing data with the same sampling rate')
    golden_data = audio_data.AudioRawData(
            binary=golden_data_binary,
            channel=golden_data_format['channel'],
            sample_format=golden_data_format['sample_format'])
    test_data = audio_data.AudioRawData(
            binary=test_data_binary,
            channel=test_data_format['channel'],
            sample_format=test_data_format['sample_format'])
    compare_results = []
    for test_channel, golden_channel in enumerate(channel_map):
        if golden_channel is None:
            logging.info('Skipped channel %d', test_channel)
            continue
        test_data_one_channel = test_data.channel_data[test_channel]
        golden_data_one_channel = golden_data.channel_data[golden_channel]
        result_dict = dict(test_channel=test_channel,
                           golden_channel=golden_channel)
        result_dict.update(
                compare_one_channel_data(
                        test_data_one_channel, test_data_format,
                        golden_data_one_channel, golden_data_format, method,
                        parameters))
        compare_results.append(result_dict)
    logging.info('compare_results: %r', compare_results)
    return_value = False if not compare_results else True
    for result in compare_results:
        if not result['equal']:
            logging.error(
                    'Failed on test channel %d and golden channel %d',
                    result['test_channel'], result['golden_channel'])
            return_value = False
    # Also checks best delay are exactly the same.
    if method == 'correlation':
        best_delays = set([result['best_delay'] for result in compare_results])
        if len(best_delays) > 1:
            logging.error('There are more than one best delay.')
            return_value = False
    return return_value


class _base_rms_test(test.test):
    """Base class for all rms_test """

    def postprocess(self):
        super(_base_rms_test, self).postprocess()

        # Sum up the number of failed constraints in each iteration
        if sum(len(x) for x in self.failed_constraints):
            generate_rms_postmortem()


class chrome_rms_test(_base_rms_test):
    """Base test class for audio RMS test with Chrome.

    The chrome instance can be accessed by self.chrome.
    """
    def warmup(self):
        skip_devices_to_test('x86-mario')
        super(chrome_rms_test, self).warmup()

        # Not all client of this file using telemetry.
        # Just do the import here for those who really need it.
        from autotest_lib.client.common_lib.cros import chrome

        self.chrome = chrome.Chrome()

        # The audio configuration could be changed when we
        # restart chrome.
        try:
            cras_rms_test_setup()
        except Exception:
            self.chrome.browser.Close()
            raise


    def cleanup(self, *args):
        try:
            self.chrome.browser.Close()
        finally:
            super(chrome_rms_test, self).cleanup()

class cras_rms_test(_base_rms_test):
    """Base test class for CRAS audio RMS test."""

    def warmup(self):
        skip_devices_to_test('x86-mario')
        super(cras_rms_test, self).warmup()
        cras_rms_test_setup()


def alsa_rms_test_setup():
    """Setup for alsa_rms_test.

    Different boards/chipsets have different set of mixer controls.  Even
    controls that have the same name on different boards might have different
    capabilities.  The following is a general idea to setup a given class of
    boards, and some specialized setup for certain boards.
    """
    card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic')
    arch = utils.get_arch()
    board = utils.get_board()
    uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090')
    if board in ['daisy_spring', 'daisy_skate']:
        # The MIC controls of the boards do not support dB syntax.
        alsa_utils.mixer_cmd(card_id,
                             'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
        alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME)
        alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME)
    elif arch in ['armv7l', 'aarch64'] or uses_max98090:
        # ARM platforms or Intel platforms that uses max98090 codec driver.
        alsa_utils.mixer_cmd(card_id,
                             'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
        alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
        alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
    else:
        # The rest of Intel platforms.
        alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME)
        alsa_utils.mixer_cmd(card_id,
                             'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN)


class alsa_rms_test(_base_rms_test):
    """Base test class for ALSA audio RMS test."""

    def warmup(self):
        skip_devices_to_test('x86-mario')
        super(alsa_rms_test, self).warmup()

        alsa_rms_test_setup()