# Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This module provides the utilities for avsync_probe's data processing. We will get a lot of raw data from the avsync_probe.Capture(). One data per millisecond. AVSyncProbeDataParser will help to transform the raw data to more readable formats. It also helps to calculate the audio/video sync timing if the sound_interval_frames parameter is not None. Example: capture_data = avsync_probe.Capture(12) parser = avsync_probe_utils.AVSyncProbeDataParser(self.resultsdir, capture_data, 30) # Use the following attributes to access data. They can be referenced in # AVSyncProbeDataParser Class. parser.video_duration_average parser.video_duration_std parser.sync_duration_averag parser.sync_duration_std parser.cumulative_frame_count parser.dropped_frame_count parser.corrupted_frame_count parser.binarize_data parser.audio_events parser.video_events """ import collections import logging import math import os import sys # Indices for binarize_data, audio_events and video_events. TIME_INDEX = 0 VIDEO_INDEX = 1 AUDIO_INDEX = 2 # This index is used for video_events and audio_events. # The slot contains the time difference to the previous event. TIME_DIFF_INDEX = 3 # SyncResult namedtuple of audio and video frame. # time_delay < 0 means that audio comes out first. SyncResult = collections.namedtuple( 'SynResult', ['video_time', 'audio_time', 'time_delay']) class GrayCode(object): """Converts bit patterns between binary and Gray code. The bit patterns of Gray code values are packed into an int value. For example, 4 is "110" in Gray code, which reads "6" when interpreted as binary. See "https://en.wikipedia.org/wiki/Gray_code" """ @staticmethod def binary_to_gray(binary): """Binary code to gray code. @param binary: Binary code. @return: gray code. """ return binary ^ (binary >> 1) @staticmethod def gray_to_binary(gray): """Gray code to binary code. @param gray: Gray code. @return: binary code. """ result = gray result ^= (result >> 16) result ^= (result >> 8) result ^= (result >> 4) result ^= (result >> 2) result ^= (result >> 1) return result class HysteresisSwitch(object): """ Iteratively binarizes input sequence using hysteresis comparator with a pair of fixed thresholds. Hysteresis means to use 2 different thresholds for activating and de-activating output. It is often used for thresholding time-series signal while reducing small noise in the input. Note that the low threshold is exclusive but the high threshold is inclusive. When the same values were applied for the both, the object works as a non-hysteresis switch. (i.e. equivalent to the >= operator). """ def __init__(self, low_threshold, high_threshold, init_state): """Init HysteresisSwitch class. @param low_threshold: The threshold value to deactivate the output. The comparison is exclusive. @param high_threshold: The threshold value to activate the output. The comparison is inclusive. @param init_state: True or False of the switch initial state. """ if low_threshold > high_threshold: raise Exception('Low threshold %d exceeds the high threshold %d', low_threshold, high_threshold) self._low_threshold = low_threshold self._high_threshold = high_threshold self._last_state = init_state def adjust_state(self, value): """Updates the state of the switch by the input value and returns the result. @param value: value for updating. @return the state of the switch. """ if value < self._low_threshold: self._last_state = False if value >= self._high_threshold: self._last_state = True return self._last_state class AVSyncProbeDataParser(object): """ Digital information extraction from the raw sensor data sequence. This class will transform the raw data to easier understand formats. Attributes: binarize_data: Transer the raw data to [Time, video code, is_audio]. video code is from 0-7 repeatedly. video_events: Events of video frame. audio_events: Events of when audio happens. video_duration_average: (ms) The average duration during video frames. video_duration_std: Standard deviation of the video_duration_average. sync_duration_average: (ms) The average duration for audio/video sync. sync_duration_std: Standard deviation of sync_duration_average. cumulative_frame_count: Number of total video frames. dropped_frame_count: Total dropped video frames. corrupted_frame_count: Total corrupted video frames. """ # Thresholds for hysteresis binarization of input signals. # Relative to the minumum (0.0) and maximum (1.0) values of the value range # of each input signal. _NORMALIZED_LOW_THRESHOLD = 0.6 _NORMALIZED_HIGH_THRESHOLD = 0.7 _VIDEO_CODE_CYCLE = (1 << 3) def __init__(self, log_dir, capture_raw_data, video_fps, sound_interval_frames=None): """Inits AVSyncProbeDataParser class. @param log_dir: Directory for dumping each events' contents. @param capture_raw_data: Raw data from avsync_probe device. A list contains the list values of [timestamp, video0, video1, video2, audio]. @param video_fps: Video frames per second. Used to know if the video frame is dropoped or just corrupted. @param sound_interval_frames: The period of sound (beep) in the number of video frames. This class will help to calculate audio/video sync stats if sound_interval_frames is not None. """ self.video_duration_average = None self.video_duration_std = None self.sync_duration_average = None self.sync_duration_std = None self.cumulative_frame_count = None self.dropped_frame_count = None self._log_dir = log_dir self._raw_data = capture_raw_data # Translate to millisecond for each video frame. self._video_duration = 1000 / video_fps self._sound_interval_frames = sound_interval_frames self._log_list_data_to_file('raw.txt', capture_raw_data) self.binarize_data = self._binarize_raw_data() # we need to get audio events before remove video preamble frames. # Because audio event may appear before the preamble frame, if we # remove the preamble frames first, we will lost the audio event. self.audio_events = self._detect_audio_events() self._remove_video_preamble() self.video_events = self._detect_video_events() self._analyze_events() self._calculate_statistics_report() def _log_list_data_to_file(self, filename, data): """Log the list data to file. It will log under self._log_dir directory. @param filename: The file name. @data: Data for logging. """ filepath = os.path.join(self._log_dir, filename) with open(filepath, 'w') as f: for v in data: f.write('%s\n' % str(v)) def _get_hysteresis_switch(self, index): """Get HysteresisSwitch by the raw data. @param index: The index of self._raw_data's element. @return: HysteresisSwitch instance by the value of the raw data. """ max_value = max(x[index] for x in self._raw_data) min_value = min(x[index] for x in self._raw_data) scale = max_value - min_value logging.info('index %d, max %d, min %d, scale %d', index, max_value, min_value, scale) return HysteresisSwitch( min_value + scale * self._NORMALIZED_LOW_THRESHOLD, min_value + scale * self._NORMALIZED_HIGH_THRESHOLD, False) def _binarize_raw_data(self): """Conducts adaptive thresholding and decoding embedded frame codes. Sensors[0] is timestamp. Sensors[1-3] are photo transistors, which outputs lower value for brighter light(=white pixels on screen). These are used to detect black and white pattern on the screen, and decoded as an integer code. The final channel is for audio input, which outputs higher voltage for larger sound volume. This will be used for detecting beep sounds added to the video. @return Decoded frame codes list for all the input frames. Each entry contains [Timestamp, video code, is_audio]. """ decoded_data = [] hystersis_switch = [] for i in xrange(5): hystersis_switch.append(self._get_hysteresis_switch(i)) for data in self._raw_data: code = 0 # Decode black-and-white pattern on video. # There are 3 black or white boxes sensed by the sensors. # Each square represents a single bit (white = 1, black = 0) coding # an integer in Gray code. for i in xrange(1, 4): # Lower sensor value for brighter light(square painted white). is_white = not hystersis_switch[i].adjust_state(data[i]) if is_white: code |= (1 << (i - 1)) code = GrayCode.gray_to_binary(code) # The final channel is sound signal. Higher sensor value for # higher sound level. sound = hystersis_switch[4].adjust_state(data[4]) decoded_data.append([data[0], code, sound]) self._log_list_data_to_file('binarize_raw.txt', decoded_data) return decoded_data def _remove_video_preamble(self): """Remove preamble video frames of self.binarize_data.""" # find preamble frame (code = 0) index = next(i for i, v in enumerate(self.binarize_data) if v[VIDEO_INDEX] == 0) self.binarize_data = self.binarize_data[index:] # skip preamble frame (code = 0) index = next(i for i, v in enumerate(self.binarize_data) if v[VIDEO_INDEX] != 0) self.binarize_data = self.binarize_data[index:] def _detect_events(self, detect_condition): """Detects events from the binarize data sequence by the detect_condition. @param detect_condition: callback function for checking event happens. This API will pass index and element of binarize_data to the callback function. @return: The list of events. It's the same as the binarize_data and add additional time_difference information. """ detected_events = [] previous_time = self.binarize_data[0][TIME_INDEX] for i, v in enumerate(self.binarize_data): if (detect_condition(i, v)): time = v[TIME_INDEX] time_difference = time - previous_time # Copy a new instance here, because we will append time # difference. event = list(v) event.append(time_difference) detected_events.append(event) previous_time = time return detected_events def _detect_audio_events(self): """Detects the audio start frame from the binarize data sequence. @return: The list of Audio events. It's the same as the binarize_data and add additional time_difference information. """ # Only check the first audio happen event. detected_events = self._detect_events( lambda i, v: (v[AUDIO_INDEX] and not self.binarize_data[i - 1][AUDIO_INDEX])) self._log_list_data_to_file('audio_events.txt', detected_events) return detected_events def _detect_video_events(self): """Detects the video frame from the binarize data sequence. @return: The list of Video events. It's the same as the binarize_data and add additional time_difference information. """ # remove duplicate frames. (frames in transition state.) detected_events = self._detect_events( lambda i, v: (v[VIDEO_INDEX] != self.binarize_data[i - 1][VIDEO_INDEX])) self._log_list_data_to_file('video_events.txt', detected_events) return detected_events def _match_sync(self, video_time): """Match the audio/video sync timing. This function will find the closest sound in the audio_events to the video_time and returns a audio/video sync tuple. @param video_time: the time of the video which have sound. @return A SyncResult namedtuple containing: - timestamp of the video frame which should have audio. - timestamp of nearest audio frame. - time delay between audio and video frame. """ closest_difference = sys.maxint audio_time = 0 for audio_event in self.audio_events: difference = audio_event[TIME_INDEX] - video_time if abs(difference) < abs(closest_difference): closest_difference = difference audio_time = audio_event[TIME_INDEX] return SyncResult(video_time, audio_time, closest_difference) def _calculate_statistics(self, data): """Calculate average and standard deviation of the list data. @param data: The list of values to be calcualted. @return: An tuple with (average, standard_deviation) """ if not data: return (None, None) total = sum(data) average = total / len(data) variance = sum((v - average)**2 for v in data) / len(data) standard_deviation = math.sqrt(variance) return (average, standard_deviation) def _analyze_events(self): """Analyze audio/video events. This function will analyze video frame status and audio/video sync status. """ sound_interval_frames = self._sound_interval_frames current_code = 0 cumulative_frame_count = 0 dropped_frame_count = 0 corrupted_frame_count = 0 sync_events = [] for v in self.video_events: code = v[VIDEO_INDEX] time = v[TIME_INDEX] frame_diff = code - current_code # Get difference of the codes. # The code is between 0 - 7. if frame_diff < 0: frame_diff += self._VIDEO_CODE_CYCLE if frame_diff != 1: # Check if we dropped frame or just got corrupted frame. # Treat the frame as corrupted frame if the frame duration is # less than 2 video frame duration. if v[TIME_DIFF_INDEX] < 2 * self._video_duration: logging.warn('Corrupted frame near %s', str(v)) # Correct the code. code = current_code + 1 corrupted_frame_count += 1 frame_diff = 1 else: logging.warn('Dropped frame near %s', str(v)) dropped_frame_count += (frame_diff - 1) cumulative_frame_count += frame_diff if sound_interval_frames is not None: # This frame corresponds to a sound. if cumulative_frame_count % sound_interval_frames == 1: sync_events.append(self._match_sync(time)) current_code = code self.cumulative_frame_count = cumulative_frame_count self.dropped_frame_count = dropped_frame_count self.corrupted_frame_count = corrupted_frame_count self._sync_events = sync_events self._log_list_data_to_file('sync.txt', sync_events) def _calculate_statistics_report(self): """Calculates statistics report.""" video_duration_average, video_duration_std = self._calculate_statistics( [v[TIME_DIFF_INDEX] for v in self.video_events]) sync_duration_average, sync_duration_std = self._calculate_statistics( [v.time_delay for v in self._sync_events]) self.video_duration_average = video_duration_average self.video_duration_std = video_duration_std self.sync_duration_average = sync_duration_average self.sync_duration_std = sync_duration_std