# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
class ChameleonVideoCapturer(object):
"""
Wraps around chameleon APIs to provide an easy way to capture video frames.
"""
def __init__(self, chameleon_port, display_facade,
timeout_get_all_frames_s=60):
self.chameleon_port = chameleon_port
self.display_facade = display_facade
self.timeout_get_all_frames_s = timeout_get_all_frames_s
self._checksums = []
self.was_plugged = None
def __enter__(self):
self.was_plugged = self.chameleon_port.plugged
if not self.was_plugged:
self.chameleon_port.plug()
self.chameleon_port.wait_video_input_stable()
return self
def capture(self, player, max_frame_count, box=None):
"""
Captures frames upto max_frame_count, saves the image with filename
same as the index of the frame in the frame buffer.
@param player: object, VimeoPlayer or NativeHTML5Player
@param max_frame_count: int, maximum total number of frames to capture.
@param box: int tuple, left, upper, right, lower pixel coordinates.
Defines the rectangular boundary within which to compare.
@return: list of paths of captured images.
"""
self.capture_only(player, max_frame_count, box)
# each checksum should be saved with a filename that is its index
ind_paths = {i : str(i) for i in self.checksums}
return self.write_images(ind_paths)
def capture_only(self, player, max_frame_count, box=None):
"""
Asynchronously begins capturing video frames. Stops capturing when the
number of frames captured is equal or more than max_frame_count. Does
save the images, gets only the checksums.
@param player: VimeoPlayer or NativeHTML5Player.
@param max_frame_count: int, the maximum number of frames we want.
@param box: int tuple, left, upper, right, lower pixel coordinates.
Defines the rectangular boundary within which to compare.
@return: list of checksums
"""
if not box:
box = self.box
self.chameleon_port.start_capturing_video(box)
player.play()
error_msg = "Expected current time to be > 1 seconds"
utils.poll_for_condition(lambda : player.currentTime() > 1,
timeout=5,
sleep_interval=0.01,
exception=error.TestError(error_msg))
error_msg = "Couldn't get the right number of frames"
utils.poll_for_condition(
lambda: self.chameleon_port.get_captured_frame_count() >=
max_frame_count,
error.TestError(error_msg),
self.timeout_get_all_frames_s,
sleep_interval=0.01)
self.chameleon_port.stop_capturing_video()
self.checksums = self.chameleon_port.get_captured_checksums()
count = self.chameleon_port.get_captured_frame_count()
# Due to the polling and asychronous calls we might get too many frames
# cap at max
del self.checksums[max_frame_count:]
logging.debug("***# of frames received %s", count)
logging.debug("Checksums before chopping repeated ones")
for c in self.checksums:
logging.debug(c)
# Find the first frame that is different from previous ones. This
# represents the start of 'interesting' frames
first_index = 0
for i in xrange(1, count):
if self.checksums[0] != self.checksums[i]:
first_index = i
break
logging.debug("*** First interesting frame at index = %s", first_index)
self.checksums = self.checksums[first_index:]
return self.checksums
def write_images(self, frame_indices, dest_dir, image_format):
"""
Saves frames of given indices to disk. The filename of the frame will be
index in the list.
@param frame_indices: list of frame indices to save.
@param dest_dir: path to the desired destination dir.
@param image_format: string, format to save the image as. e.g; PNG
@return: list of file paths
"""
if type(frame_indices) is not list:
frame_indices = [frame_indices]
test_images = []
curr_checksum = None
for i, frame_index in enumerate(frame_indices):
path = os.path.join(dest_dir, str(i) + '.' + image_format)
# previous is what was current in the previous iteration
prev_checksum = curr_checksum
curr_checksum = self.checksums[frame_index]
if curr_checksum == prev_checksum:
logging.debug("Image the same as previous image, copy it.")
else:
logging.debug("Read frame %d, store as %s.", i, path)
curr_img = self.chameleon_port.read_captured_frame(frame_index)
curr_img.save(path)
test_images.append(path)
return test_images
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.was_plugged:
self.chameleon_port.unplug()