#!/usr/bin/python # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import numpy import os import re import tempfile import threading import time from glob import glob from autotest_lib.client.bin import test, utils from autotest_lib.client.bin.input.input_device import * from autotest_lib.client.common_lib import error from autotest_lib.client.cros.audio import alsa_utils from autotest_lib.client.cros.audio import audio_data from autotest_lib.client.cros.audio import cmd_utils from autotest_lib.client.cros.audio import cras_utils from autotest_lib.client.cros.audio import sox_utils LD_LIBRARY_PATH = 'LD_LIBRARY_PATH' _AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics' _DEFAULT_NUM_CHANNELS = 2 _DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat' _DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L' _DEFAULT_PLAYBACK_VOLUME = 100 _DEFAULT_CAPTURE_GAIN = 2500 _DEFAULT_ALSA_MAX_VOLUME = '100%' _DEFAULT_ALSA_CAPTURE_GAIN = '25dB' # Minimum RMS value to pass when checking recorded file. _DEFAULT_SOX_RMS_THRESHOLD = 0.08 _JACK_VALUE_ON_RE = re.compile('.*values=on') _HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack') _MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack') _SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)') _SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)') _AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected' _MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS' _REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS' # Tools from platform/audiotest AUDIOFUNTEST_PATH = 'audiofuntest' AUDIOLOOP_PATH = 'looptest' LOOPBACK_LATENCY_PATH = 'loopback_latency' SOX_PATH = 'sox' TEST_TONES_PATH = 'test_tones' _MINIMUM_NORM = 0.001 _CORRELATION_INDEX_THRESHOLD = 0.999 # The minimum difference of estimated frequencies between two sine waves. _FREQUENCY_DIFF_THRESHOLD = 20 # The minimum RMS value of meaningful audio data. _MEANINGFUL_RMS_THRESHOLD = 0.001 def set_mixer_controls(mixer_settings={}, card='0'): """Sets all mixer controls listed in the mixer settings on card. @param mixer_settings: Mixer settings to set. @param card: Index of audio card to set mixer settings for. """ logging.info('Setting mixer control values on %s', card) for item in mixer_settings: logging.info('Setting %s to %s on card %s', item['name'], item['value'], card) cmd = 'amixer -c %s cset name=%s %s' cmd = cmd % (card, item['name'], item['value']) try: utils.system(cmd) except error.CmdError: # A card is allowed not to support all the controls, so don't # fail the test here if we get an error. logging.info('amixer command failed: %s', cmd) def set_volume_levels(volume, capture): """Sets the volume and capture gain through cras_test_client. @param volume: The playback volume to set. @param capture: The capture gain to set. """ logging.info('Setting volume level to %d', volume) utils.system('/usr/bin/cras_test_client --volume %d' % volume) logging.info('Setting capture gain to %d', capture) utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture) utils.system('/usr/bin/cras_test_client --dump_server_info') utils.system('/usr/bin/cras_test_client --mute 0') utils.system('amixer -c 0 contents') def loopback_latency_check(**args): """Checks loopback latency. @param args: additional arguments for loopback_latency. @return A tuple containing measured and reported latency in uS. Return None if no audio detected. """ noise_threshold = str(args['n']) if args.has_key('n') else '400' cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold) output = utils.system_output(cmd, retain_output=True) # Sleep for a short while to make sure device is not busy anymore # after called loopback_latency. time.sleep(.1) measured_latency = None reported_latency = None for line in output.split('\n'): match = re.search(_MEASURED_LATENCY_RE, line, re.I) if match: measured_latency = int(match.group(1)) continue match = re.search(_REPORTED_LATENCY_RE, line, re.I) if match: reported_latency = int(match.group(1)) continue if re.search(_AUDIO_NOT_FOUND_RE, line, re.I): return None if measured_latency and reported_latency: return (measured_latency, reported_latency) else: # Should not reach here, just in case. return None def get_mixer_jack_status(jack_reg_exp): """Gets the mixer jack status. @param jack_reg_exp: The regular expression to match jack control name. @return None if the control does not exist, return True if jack control is detected plugged, return False otherwise. """ output = utils.system_output('amixer -c0 controls', retain_output=True) numid = None for line in output.split('\n'): m = jack_reg_exp.match(line) if m: numid = m.group(1) break # Proceed only when matched numid is not empty. if numid: output = utils.system_output('amixer -c0 cget numid=%s' % numid) for line in output.split('\n'): if _JACK_VALUE_ON_RE.match(line): return True return False else: return None def get_hp_jack_status(): """Gets the status of headphone jack.""" status = get_mixer_jack_status(_HP_JACK_CONTROL_RE) if status is not None: return status # When headphone jack is not found in amixer, lookup input devices # instead. # # TODO(hychao): Check hp/mic jack status dynamically from evdev. And # possibly replace the existing check using amixer. for evdev in glob('/dev/input/event*'): device = InputDevice(evdev) if device.is_hp_jack(): return device.get_headphone_insert() else: return None def get_mic_jack_status(): """Gets the status of mic jack.""" status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE) if status is not None: return status # When mic jack is not found in amixer, lookup input devices instead. for evdev in glob('/dev/input/event*'): device = InputDevice(evdev) if device.is_mic_jack(): return device.get_microphone_insert() else: return None def log_loopback_dongle_status(): """Log the status of the loopback dongle to make sure it is equipped.""" dongle_status_ok = True # Check Mic Jack mic_jack_status = get_mic_jack_status() logging.info('Mic jack status: %s', mic_jack_status) dongle_status_ok &= bool(mic_jack_status) # Check Headphone Jack hp_jack_status = get_hp_jack_status() logging.info('Headphone jack status: %s', hp_jack_status) dongle_status_ok &= bool(hp_jack_status) # Use latency check to test if audio can be captured through dongle. # We only want to know the basic function of dongle, so no need to # assert the latency accuracy here. latency = loopback_latency_check(n=4000) if latency: logging.info('Got latency measured %d, reported %d', latency[0], latency[1]) else: logging.info('Latency check fail.') dongle_status_ok = False logging.info('audio loopback dongle test: %s', 'PASS' if dongle_status_ok else 'FAIL') # Functions to test audio palyback. def play_sound(duration_seconds=None, audio_file_path=None): """Plays a sound file found at |audio_file_path| for |duration_seconds|. If |audio_file_path|=None, plays a default audio file. If |duration_seconds|=None, plays audio file in its entirety. @param duration_seconds: Duration to play sound. @param audio_file_path: Path to the audio file. """ if not audio_file_path: audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav' duration_arg = ('-d %d' % duration_seconds) if duration_seconds else '' utils.system('aplay %s %s' % (duration_arg, audio_file_path)) def get_play_sine_args(channel, odev='default', freq=1000, duration=10, sample_size=16): """Gets the command args to generate a sine wav to play to odev. @param channel: 0 for left, 1 for right; otherwize, mono. @param odev: alsa output device. @param freq: frequency of the generated sine tone. @param duration: duration of the generated sine tone. @param sample_size: output audio sample size. Default to 16. """ cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa', odev, 'synth', str(duration)] if channel == 0: cmdargs += ['sine', str(freq), 'sine', '0'] elif channel == 1: cmdargs += ['sine', '0', 'sine', str(freq)] else: cmdargs += ['sine', str(freq)] return cmdargs def play_sine(channel, odev='default', freq=1000, duration=10, sample_size=16): """Generates a sine wave and plays to odev. @param channel: 0 for left, 1 for right; otherwize, mono. @param odev: alsa output device. @param freq: frequency of the generated sine tone. @param duration: duration of the generated sine tone. @param sample_size: output audio sample size. Default to 16. """ cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size) utils.system(' '.join(cmdargs)) # Functions to compose customized sox command, execute it and process the # output of sox command. def get_sox_mixer_cmd(infile, channel, num_channels=_DEFAULT_NUM_CHANNELS, sox_format=_DEFAULT_SOX_FORMAT): """Gets sox mixer command to reduce channel. @param infile: Input file name. @param channel: The selected channel to take effect. @param num_channels: The number of total channels to test. @param sox_format: Format to generate sox command. """ # Build up a pan value string for the sox command. if channel == 0: pan_values = '1' else: pan_values = '0' for pan_index in range(1, num_channels): if channel == pan_index: pan_values = '%s%s' % (pan_values, ',1') else: pan_values = '%s%s' % (pan_values, ',0') return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH, sox_format, infile, sox_format, pan_values) def sox_stat_output(infile, channel, num_channels=_DEFAULT_NUM_CHANNELS, sox_format=_DEFAULT_SOX_FORMAT): """Executes sox stat command. @param infile: Input file name. @param channel: The selected channel. @param num_channels: The number of total channels to test. @param sox_format: Format to generate sox command. @return The output of sox stat command """ sox_mixer_cmd = get_sox_mixer_cmd(infile, channel, num_channels, sox_format) stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format) sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd) return utils.system_output(sox_cmd, retain_output=True) def get_audio_rms(sox_output): """Gets the audio RMS value from sox stat output @param sox_output: Output of sox stat command. @return The RMS value parsed from sox stat output. """ for rms_line in sox_output.split('\n'): m = _SOX_RMS_AMPLITUDE_RE.match(rms_line) if m is not None: return float(m.group(1)) def get_rough_freq(sox_output): """Gets the rough audio frequency from sox stat output @param sox_output: Output of sox stat command. @return The rough frequency value parsed from sox stat output. """ for rms_line in sox_output.split('\n'): m = _SOX_ROUGH_FREQ_RE.match(rms_line) if m is not None: return int(m.group(1)) def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD): """Checks if the calculated RMS value is expected. @param sox_output: The output from sox stat command. @param sox_threshold: The threshold to test RMS value against. @raises error.TestError if RMS amplitude can't be parsed. @raises error.TestFail if the RMS amplitude of the recording isn't above the threshold. """ rms_val = get_audio_rms(sox_output) # In case we don't get a valid RMS value. if rms_val is None: raise error.TestError( 'Failed to generate an audio RMS value from playback.') logging.info('Got audio RMS value of %f. Minimum pass is %f.', rms_val, sox_threshold) if rms_val < sox_threshold: raise error.TestFail( 'Audio RMS value %f too low. Minimum pass is %f.' % (rms_val, sox_threshold)) def noise_reduce_file(in_file, noise_file, out_file, sox_format=_DEFAULT_SOX_FORMAT): """Runs the sox command to reduce noise. Runs the sox command to noise-reduce in_file using the noise profile from noise_file. @param in_file: The file to noise reduce. @param noise_file: The file containing the noise profile. This can be created by recording silence. @param out_file: The file contains the noise reduced sound. @param sox_format: The sox format to generate sox command. """ prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH, sox_format, noise_file) reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' % (SOX_PATH, sox_format, in_file, sox_format, out_file)) utils.system('%s | %s' % (prof_cmd, reduce_cmd)) def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND): """Records a sample from the default input device. @param tmpfile: The file to record to. @param record_command: The command to record audio. """ utils.system('%s %s' % (record_command, tmpfile)) def create_wav_file(wav_dir, prefix=""): """Creates a unique name for wav file. The created file name will be preserved in autotest result directory for future analysis. @param wav_dir: The directory of created wav file. @param prefix: specified file name prefix. """ filename = "%s-%s.wav" % (prefix, time.time()) return os.path.join(wav_dir, filename) def run_in_parallel(*funs): """Runs methods in parallel. @param funs: methods to run. """ threads = [] for f in funs: t = threading.Thread(target=f) t.start() threads.append(t) for t in threads: t.join() def loopback_test_channels(noise_file_name, wav_dir, playback_callback=None, check_recorded_callback=check_audio_rms, preserve_test_file=True, num_channels = _DEFAULT_NUM_CHANNELS, record_callback=record_sample, mix_callback=None): """Tests loopback on all channels. @param noise_file_name: Name of the file contains pre-recorded noise. @param wav_dir: The directory of created wav file. @param playback_callback: The callback to do the playback for one channel. @param record_callback: The callback to do the recording. @param check_recorded_callback: The callback to check recorded file. @param preserve_test_file: Retain the recorded files for future debugging. @param num_channels: The number of total channels to test. @param mix_callback: The callback to do on the one-channel file. """ for channel in xrange(num_channels): record_file_name = create_wav_file(wav_dir, "record-%d" % channel) functions = [lambda: record_callback(record_file_name)] if playback_callback: functions.append(lambda: playback_callback(channel)) if mix_callback: mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel) functions.append(lambda: mix_callback(mix_file_name)) run_in_parallel(*functions) if mix_callback: sox_output_mix = sox_stat_output(mix_file_name, channel) rms_val_mix = get_audio_rms(sox_output_mix) logging.info('Got mixed audio RMS value of %f.', rms_val_mix) sox_output_record = sox_stat_output(record_file_name, channel) rms_val_record = get_audio_rms(sox_output_record) logging.info('Got recorded audio RMS value of %f.', rms_val_record) reduced_file_name = create_wav_file(wav_dir, "reduced-%d" % channel) noise_reduce_file(record_file_name, noise_file_name, reduced_file_name) sox_output_reduced = sox_stat_output(reduced_file_name, channel) if not preserve_test_file: os.unlink(reduced_file_name) os.unlink(record_file_name) if mix_callback: os.unlink(mix_file_name) check_recorded_callback(sox_output_reduced) def get_channel_sox_stat( input_audio, channel_index, channels=2, bits=16, rate=48000): """Gets the sox stat info of the selected channel in the input audio file. @param input_audio: The input audio file to be analyzed. @param channel_index: The index of the channel to be analyzed. (1 for the first channel). @param channels: The number of channels in the input audio. @param bits: The number of bits of each audio sample. @param rate: The sampling rate. """ if channel_index <= 0 or channel_index > channels: raise ValueError('incorrect channel_indexi: %d' % channel_index) if channels == 1: return sox_utils.get_stat( input_audio, channels=channels, bits=bits, rate=rate) p1 = cmd_utils.popen( sox_utils.extract_channel_cmd( input_audio, '-', channel_index, channels=channels, bits=bits, rate=rate), stdout=cmd_utils.PIPE) p2 = cmd_utils.popen( sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate), stdin=p1.stdout, stderr=cmd_utils.PIPE) stat_output = p2.stderr.read() cmd_utils.wait_and_check_returncode(p1, p2) return sox_utils.parse_stat_output(stat_output) def get_rms(input_audio, channels=1, bits=16, rate=48000): """Gets the RMS values of all channels of the input audio. @param input_audio: The input audio file to be checked. @param channels: The number of channels in the input audio. @param bits: The number of bits of each audio sample. @param rate: The sampling rate. """ stats = [get_channel_sox_stat( input_audio, i + 1, channels=channels, bits=bits, rate=rate) for i in xrange(channels)] logging.info('sox stat: %s', [str(s) for s in stats]) return [s.rms for s in stats] def reduce_noise_and_get_rms( input_audio, noise_file, channels=1, bits=16, rate=48000): """Reduces noise in the input audio by the given noise file and then gets the RMS values of all channels of the input audio. @param input_audio: The input audio file to be analyzed. @param noise_file: The noise file used to reduce noise in the input audio. @param channels: The number of channels in the input audio. @param bits: The number of bits of each audio sample. @param rate: The sampling rate. """ with tempfile.NamedTemporaryFile() as reduced_file: p1 = cmd_utils.popen( sox_utils.noise_profile_cmd( noise_file, '-', channels=channels, bits=bits, rate=rate), stdout=cmd_utils.PIPE) p2 = cmd_utils.popen( sox_utils.noise_reduce_cmd( input_audio, reduced_file.name, '-', channels=channels, bits=bits, rate=rate), stdin=p1.stdout) cmd_utils.wait_and_check_returncode(p1, p2) return get_rms(reduced_file.name, channels, bits, rate) def skip_devices_to_test(*boards): """Devices to skip due to hardware or test compatibility issues. @param boards: the boards to skip testing. """ # TODO(scottz): Remove this when crbug.com/220147 is fixed. dut_board = utils.get_current_board() if dut_board in boards: raise error.TestNAError('This test is not available on %s' % dut_board) def cras_rms_test_setup(): """Setups for the cras_rms_tests. To make sure the line_out-to-mic_in path is all green. """ # TODO(owenlin): Now, the nodes are choosed by chrome. # We should do it here. cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME) cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME) cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN) cras_utils.set_system_mute(False) cras_utils.set_capture_mute(False) def generate_rms_postmortem(): """Generates postmortem for rms tests.""" try: logging.info('audio postmortem report') log_loopback_dongle_status() logging.info(get_audio_diagnostics()) except Exception: logging.exception('Error while generating postmortem report') def get_audio_diagnostics(): """Gets audio diagnostic results. @returns: a string containing diagnostic results. """ return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE) def get_max_cross_correlation(signal_a, signal_b): """Gets max cross-correlation and best time delay of two signals. Computes cross-correlation function between two signals and gets the maximum value and time delay. The steps includes: 1. Compute cross-correlation function of X and Y and get Cxy. The correlation function Cxy is an array where Cxy[k] is the cross product of X and Y when Y is delayed by k. Refer to manual of numpy.correlate for detail of correlation. 2. Find the maximum value C_max and index C_index in Cxy. 3. Compute L2 norm of X and Y to get norm(X) and norm(Y). 4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation. Max cross-correlation indicates the similarity of X and Y. The value is 1 if X equals Y multiplied by a positive scalar. The value is -1 if X equals Y multiplied by a negative scaler. Any constant level shift will be regarded as distortion and will make max cross-correlation value deviated from 1. C_index is the best time delay of Y that make Y looks similar to X. Refer to http://en.wikipedia.org/wiki/Cross-correlation. @param signal_a: A list of numbers which contains the first signal. @param signal_b: A list of numbers which contains the second signal. @raises: ValueError if any number in signal_a or signal_b is not a float. ValueError if norm of any array is less than _MINIMUM_NORM. @returns: A tuple (correlation index, best delay). If there are more than one best delay, just return the first one. """ def check_list_contains_float(numbers): """Checks the elements in a list are all float. @param numbers: A list of numbers. @raises: ValueError if there is any element which is not a float in the list. """ if any(not isinstance(x, float) for x in numbers): raise ValueError('List contains number which is not a float') check_list_contains_float(signal_a) check_list_contains_float(signal_b) norm_a = numpy.linalg.norm(signal_a) norm_b = numpy.linalg.norm(signal_b) logging.debug('norm_a: %f', norm_a) logging.debug('norm_b: %f', norm_b) if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM: raise ValueError('No meaningful data as norm is too small.') correlation = numpy.correlate(signal_a, signal_b, 'full') max_correlation = max(correlation) best_delays = [i for i, j in enumerate(correlation) if j == max_correlation] if len(best_delays) > 1: logging.warning('There are more than one best delay: %r', best_delays) return max_correlation / (norm_a * norm_b), best_delays[0] def trim_data(data, threshold=0): """Trims a data by removing value that is too small in head and tail. Removes elements in head and tail whose absolute value is smaller than or equal to threshold. E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) = ([0.2, 0.3, 0.2], 2) @param data: A list of numbers. @param threshold: The threshold to compare against. @returns: A tuple (trimmed_data, end_trimmed_length), where end_trimmed_length is the length of original data being trimmed from the end. Returns ([], None) if there is no valid data. """ indice_valid = [ i for i, j in enumerate(data) if abs(j) > threshold] if not indice_valid: logging.warning( 'There is no element with absolute value greater ' 'than threshold %f', threshold) return [], None logging.debug('Start and end of indice_valid: %d, %d', indice_valid[0], indice_valid[-1]) end_trimmed_length = len(data) - indice_valid[-1] - 1 logging.debug('Trimmed length in the end: %d', end_trimmed_length) return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length) def get_one_channel_correlation(test_data, golden_data): """Gets max cross-correlation of test_data and golden_data. Trims test data and compute the max cross-correlation against golden_data. Signal can be trimmed because those zero values in the head and tail of a signal will not affect correlation computation. @param test_data: A list containing the data to compare against golden data. @param golden_data: A list containing the golden data. @returns: A tuple (max cross-correlation, best_delay) if data is valid. Otherwise returns (None, None). Refer to docstring of get_max_cross_correlation. """ trimmed_test_data, end_trimmed_length = trim_data(test_data) def to_float(samples): """Casts elements in the list to float. @param samples: A list of numbers. @returns: A list of original numbers casted to float. """ samples_float = [float(x) for x in samples] return samples_float max_cross_correlation, best_delay = get_max_cross_correlation( to_float(golden_data), to_float(trimmed_test_data)) # The reason to add back the trimmed length in the end. # E.g.: # golden data: # # |-----------vvvv----------------| vvvv is the signal of interest. # a b # # test data: # # |---x----vvvv--------x----------------| x is the place to trim. # c d e f # # trimmed test data: # # |----vvvv--------| # d e # # The first output of cross correlation computation : # # |-----------vvvv----------------| # a b # # |----vvvv--------| # d e # # The largest output of cross correlation computation happens at # delay a + e. # # |-----------vvvv----------------| # a b # # |----vvvv--------| # d e # # Cross correlation starts computing by aligning the last sample # of the trimmed test data to the first sample of golden data. # The best delay calculated from trimmed test data and golden data # cross correlation is e + a. But the real best delay that should be # identical on two channel should be e + a + f. # So we need to add back the length being trimmed in the end. if max_cross_correlation: return max_cross_correlation, best_delay + end_trimmed_length else: return None, None def compare_one_channel_correlation(test_data, golden_data, parameters): """Compares two one-channel data by correlation. @param test_data: A list containing the data to compare against golden data. @param golden_data: A list containing the golden data. @param parameters: A dict containing parameters for method. @returns: A dict containing: index: The index of similarity where 1 means they are different only by a positive scale. best_delay: The best delay of test data in relative to golden data. equal: A bool containing comparing result. """ if 'correlation_threshold' in parameters: threshold = parameters['correlation_threshold'] else: threshold = _CORRELATION_INDEX_THRESHOLD result_dict = dict() max_cross_correlation, best_delay = get_one_channel_correlation( test_data, golden_data) result_dict['index'] = max_cross_correlation result_dict['best_delay'] = best_delay result_dict['equal'] = True if ( max_cross_correlation and max_cross_correlation > threshold) else False logging.debug('result_dict: %r', result_dict) return result_dict def get_one_channel_stat(data, data_format): """Gets statistic information of data. @param data: A list containing one channel data. @param data_format: A dict containing data format of data. @return: The sox stat parsed result. An object containing sameple_count: An int. Samples read. length: A float. Length in seconds. rms: A float. RMS amplitude. rough_frequency: A float. Rough frequency. """ if not data: raise ValueError('Data is empty. Can not get stat') raw_data = audio_data.AudioRawData( binary=None, channel=1, sample_format=data_format['sample_format']) raw_data.copy_channel_data([data]) with tempfile.NamedTemporaryFile() as raw_data_file: raw_data_path = raw_data_file.name raw_data.write_to_file(raw_data_path) bits = 8 * (audio_data.SAMPLE_FORMATS[ data_format['sample_format']]['size_bytes']) stat = sox_utils.get_stat(raw_data_path, channels=1, bits=bits, rate=data_format['rate']) return stat def compare_one_channel_frequency(test_data, test_data_format, golden_data, golden_data_format): """Compares two one-channel data by frequency. @param test_data: A list containing the data to compare against golden data. @param test_data_format: A dict containing data format of test data. @param golden_data: A list containing the golden data. @param golden_data_format: A dict containing data format of golden data. @returns: A dict containing: test_data_frequency: test data frequency. golden_data_frequency: golden data frequency. equal: A bool containing comparing result. @raises: ValueError if the test data RMS is too small to be meaningful. """ result_dict = dict() golden_data_stat = get_one_channel_stat(golden_data, golden_data_format) logging.info('Get golden data one channel stat: %s', golden_data_stat) test_data_stat = get_one_channel_stat(test_data, test_data_format) logging.info('Get test data one channel stat: %s', test_data_stat) result_dict['golden_data_frequency'] = golden_data_stat.rough_frequency result_dict['test_data_frequency'] = test_data_stat.rough_frequency result_dict['equal'] = True if ( abs(result_dict['test_data_frequency'] - result_dict['golden_data_frequency']) < _FREQUENCY_DIFF_THRESHOLD ) else False logging.debug('result_dict: %r', result_dict) if test_data_stat.rms < _MEANINGFUL_RMS_THRESHOLD: raise ValueError('Recorded RMS %f is too small to be meaningful.', test_data_stat.rms) return result_dict def compare_one_channel_data(test_data, test_data_format, golden_data, golden_data_format, method, parameters): """Compares two one-channel data. @param test_data: A list containing the data to compare against golden data. @param test_data_format: The data format of test data. @param golden_data: A list containing the golden data. @param golden_data_format: The data format of golden data. @param method: The comparing method. Currently only 'correlation' is supported. @param parameters: A dict containing parameters for method. @returns: A dict containing: index: The index of similarity where 1 means they are different only by a positive scale. best_delay: The best delay of test data in relative to golden data. equal: A bool containing comparing result. @raises: NotImplementedError if method is not supported. """ if method == 'correlation': return compare_one_channel_correlation(test_data, golden_data, parameters) if method == 'frequency': return compare_one_channel_frequency( test_data, test_data_format, golden_data, golden_data_format) raise NotImplementedError('method %s is not implemented' % method) def compare_data(golden_data_binary, golden_data_format, test_data_binary, test_data_format, channel_map, method, parameters=None): """Compares two raw data. @param golden_data_binary: The binary containing golden data. @param golden_data_format: The data format of golden data. @param test_data_binary: The binary containing test data. @param test_data_format: The data format of test data. @param channel_map: A list containing channel mapping. E.g. [1, 0, None, None, None, None, None, None] means channel 0 of test data should map to channel 1 of golden data. Channel 1 of test data should map to channel 0 of golden data. Channel 2 to 7 of test data should be skipped. @param method: The method to compare data. Use 'correlation' to compare general data. Use 'frequency' to compare data containing sine wave. @param parameters: A dict containing parameters for method, if needed. @returns: A boolean for compare result. @raises: NotImplementedError if file type is not raw. NotImplementedError if sampling rates of two data are not the same. """ if parameters is None: parameters = dict() if (golden_data_format['file_type'] != 'raw' or test_data_format['file_type'] != 'raw'): raise NotImplementedError('Only support raw data in compare_data.') if (golden_data_format['rate'] != test_data_format['rate']): raise NotImplementedError( 'Only support comparing data with the same sampling rate') golden_data = audio_data.AudioRawData( binary=golden_data_binary, channel=golden_data_format['channel'], sample_format=golden_data_format['sample_format']) test_data = audio_data.AudioRawData( binary=test_data_binary, channel=test_data_format['channel'], sample_format=test_data_format['sample_format']) compare_results = [] for test_channel, golden_channel in enumerate(channel_map): if golden_channel is None: logging.info('Skipped channel %d', test_channel) continue test_data_one_channel = test_data.channel_data[test_channel] golden_data_one_channel = golden_data.channel_data[golden_channel] result_dict = dict(test_channel=test_channel, golden_channel=golden_channel) result_dict.update( compare_one_channel_data( test_data_one_channel, test_data_format, golden_data_one_channel, golden_data_format, method, parameters)) compare_results.append(result_dict) logging.info('compare_results: %r', compare_results) return_value = False if not compare_results else True for result in compare_results: if not result['equal']: logging.error( 'Failed on test channel %d and golden channel %d', result['test_channel'], result['golden_channel']) return_value = False # Also checks best delay are exactly the same. if method == 'correlation': best_delays = set([result['best_delay'] for result in compare_results]) if len(best_delays) > 1: logging.error('There are more than one best delay.') return_value = False return return_value class _base_rms_test(test.test): """Base class for all rms_test """ def postprocess(self): super(_base_rms_test, self).postprocess() # Sum up the number of failed constraints in each iteration if sum(len(x) for x in self.failed_constraints): generate_rms_postmortem() class chrome_rms_test(_base_rms_test): """Base test class for audio RMS test with Chrome. The chrome instance can be accessed by self.chrome. """ def warmup(self): skip_devices_to_test('x86-mario') super(chrome_rms_test, self).warmup() # Not all client of this file using telemetry. # Just do the import here for those who really need it. from autotest_lib.client.common_lib.cros import chrome self.chrome = chrome.Chrome() # The audio configuration could be changed when we # restart chrome. try: cras_rms_test_setup() except Exception: self.chrome.browser.Close() raise def cleanup(self, *args): try: self.chrome.browser.Close() finally: super(chrome_rms_test, self).cleanup() class cras_rms_test(_base_rms_test): """Base test class for CRAS audio RMS test.""" def warmup(self): skip_devices_to_test('x86-mario') super(cras_rms_test, self).warmup() cras_rms_test_setup() def alsa_rms_test_setup(): """Setup for alsa_rms_test. Different boards/chipsets have different set of mixer controls. Even controls that have the same name on different boards might have different capabilities. The following is a general idea to setup a given class of boards, and some specialized setup for certain boards. """ card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic') arch = utils.get_arch() board = utils.get_board() uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090') if board in ['daisy_spring', 'daisy_skate']: # The MIC controls of the boards do not support dB syntax. alsa_utils.mixer_cmd(card_id, 'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME) alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME) alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME) elif arch in ['armv7l', 'aarch64'] or uses_max98090: # ARM platforms or Intel platforms that uses max98090 codec driver. alsa_utils.mixer_cmd(card_id, 'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME) alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN) alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN) else: # The rest of Intel platforms. alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME) alsa_utils.mixer_cmd(card_id, 'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN) class alsa_rms_test(_base_rms_test): """Base test class for ALSA audio RMS test.""" def warmup(self): skip_devices_to_test('x86-mario') super(alsa_rms_test, self).warmup() alsa_rms_test_setup()