# Copyright 2013 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import its.error
import os
import os.path
import sys
import re
import json
import time
import unittest
import socket
import subprocess
import hashlib
import numpy
class ItsSession(object):
"""Controls a device over adb to run ITS scripts.
The script importing this module (on the host machine) prepares JSON
objects encoding CaptureRequests, specifying sets of parameters to use
when capturing an image using the Camera2 APIs. This class encapsualtes
sending the requests to the device, monitoring the device's progress, and
copying the resultant captures back to the host machine when done. TCP
forwarded over adb is the transport mechanism used.
The device must have ItsService.apk installed.
Attributes:
sock: The open socket.
"""
# TODO: Handle multiple connected devices.
# The adb program is used for communication with the device. Need to handle
# the case of multiple devices connected. Currently, uses the "-d" param
# to adb, which causes it to fail if there is more than one device.
ADB = "adb -d"
# Open a connection to localhost:6000, forwarded to port 6000 on the device.
# TODO: Support multiple devices running over different TCP ports.
IPADDR = '127.0.0.1'
PORT = 6000
BUFFER_SIZE = 4096
# Seconds timeout on each socket operation.
SOCK_TIMEOUT = 10.0
PACKAGE = 'com.android.camera2.its'
INTENT_START = 'com.android.camera2.its.START'
def __init__(self):
reboot_device_on_argv()
# TODO: Figure out why "--user 0" is needed, and fix the problem
_run('%s shell am force-stop --user 0 %s' % (self.ADB, self.PACKAGE))
_run(('%s shell am startservice --user 0 -t text/plain '
'-a %s') % (self.ADB, self.INTENT_START))
_run('%s forward tcp:%d tcp:%d' % (self.ADB,self.PORT,self.PORT))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.IPADDR, self.PORT))
self.sock.settimeout(self.SOCK_TIMEOUT)
def __del__(self):
if self.sock:
self.sock.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
return False
def __read_response_from_socket(self):
# Read a line (newline-terminated) string serialization of JSON object.
chars = []
while len(chars) == 0 or chars[-1] != '\n':
chars.append(self.sock.recv(1))
line = ''.join(chars)
jobj = json.loads(line)
# Optionally read a binary buffer of a fixed size.
buf = None
if jobj.has_key("bufValueSize"):
n = jobj["bufValueSize"]
buf = bytearray(n)
view = memoryview(buf)
while n > 0:
nbytes = self.sock.recv_into(view, n)
view = view[nbytes:]
n -= nbytes
buf = numpy.frombuffer(buf, dtype=numpy.uint8)
return jobj, buf
def get_camera_properties(self):
"""Get the camera properties object for the device.
Returns:
The Python dictionary object for the CameraProperties object.
"""
cmd = {}
cmd["cmdName"] = "getCameraProperties"
self.sock.send(json.dumps(cmd) + "\n")
data,_ = self.__read_response_from_socket()
if data['tag'] != 'cameraProperties':
raise its.error.Error('Invalid command response')
return data['objValue']['cameraProperties']
def do_3a(self, region_ae, region_awb, region_af,
do_ae=True, do_awb=True, do_af=True):
"""Perform a 3A operation on the device.
Triggers some or all of AE, AWB, and AF, and returns once they have
converged. Uses the vendor 3A that is implemented inside the HAL.
Throws an assertion if 3A fails to converge.
Args:
region_ae: Normalized rect. (x,y,w,h) specifying the AE region.
region_awb: Normalized rect. (x,y,w,h) specifying the AWB region.
region_af: Normalized rect. (x,y,w,h) specifying the AF region.
Returns:
Five values:
* AE sensitivity; None if do_ae is False
* AE exposure time; None if do_ae is False
* AWB gains (list); None if do_awb is False
* AWB transform (list); None if do_awb is false
* AF focus position; None if do_af is false
"""
print "Running vendor 3A on device"
cmd = {}
cmd["cmdName"] = "do3A"
cmd["regions"] = {"ae": region_ae, "awb": region_awb, "af": region_af}
cmd["triggers"] = {"ae": do_ae, "af": do_af}
self.sock.send(json.dumps(cmd) + "\n")
# Wait for each specified 3A to converge.
ae_sens = None
ae_exp = None
awb_gains = None
awb_transform = None
af_dist = None
while True:
data,_ = self.__read_response_from_socket()
vals = data['strValue'].split()
if data['tag'] == 'aeResult':
ae_sens, ae_exp = [int(i) for i in vals]
elif data['tag'] == 'afResult':
af_dist = float(vals[0])
elif data['tag'] == 'awbResult':
awb_gains = [float(f) for f in vals[:4]]
awb_transform = [float(f) for f in vals[4:]]
elif data['tag'] == '3aDone':
break
else:
raise its.error.Error('Invalid command response')
if (do_ae and ae_sens == None or do_awb and awb_gains == None
or do_af and af_dist == None):
raise its.error.Error('3A failed to converge')
return ae_sens, ae_exp, awb_gains, awb_transform, af_dist
def do_capture(self, cap_request, out_surface=None):
"""Issue capture request(s), and read back the image(s) and metadata.
The main top-level function for capturing one or more images using the
device. Captures a single image if cap_request is a single object, and
captures a burst if it is a list of objects.
The out_surface field can specify the width, height, and format of
the captured image. The format may be "yuv" or "jpeg". The default is
a YUV420 frame ("yuv") corresponding to a full sensor frame.
Example of a single capture request:
{
"android.sensor.exposureTime": 100*1000*1000,
"android.sensor.sensitivity": 100
}
Example of a list of capture requests:
[
{
"android.sensor.exposureTime": 100*1000*1000,
"android.sensor.sensitivity": 100
},
{
"android.sensor.exposureTime": 100*1000*1000,
"android.sensor.sensitivity": 200
}
]
Example of an output surface specification:
{
"width": 640,
"height": 480,
"format": "yuv"
}
Args:
cap_request: The Python dict/list specifying the capture(s), which
will be converted to JSON and sent to the device.
out_surface: (Optional) the width,height,format to use for all
captured images.
Returns:
An object or list of objects (depending on whether the request was
for a single or burst capture), where each object contains the
following fields:
* data: the image data as a numpy array of bytes.
* width: the width of the captured image.
* height: the height of the captured image.
* format: the format of the image, in ["yuv", "jpeg"].
* metadata: the capture result object (Python dictionaty).
"""
cmd = {}
cmd["cmdName"] = "doCapture"
if not isinstance(cap_request, list):
cmd["captureRequests"] = [cap_request]
else:
cmd["captureRequests"] = cap_request
if out_surface is not None:
cmd["outputSurface"] = out_surface
n = len(cmd["captureRequests"])
print "Capturing %d image%s" % (n, "s" if n>1 else "")
self.sock.send(json.dumps(cmd) + "\n")
# Wait for n images and n metadata responses from the device.
bufs = []
mds = []
fmts = []
width = None
height = None
while len(bufs) < n or len(mds) < n:
jsonObj,buf = self.__read_response_from_socket()
if jsonObj['tag'] in ['jpegImage','yuvImage'] and buf is not None:
bufs.append(buf)
fmts.append(jsonObj['tag'][:-5])
elif jsonObj['tag'] == 'captureResults':
mds.append(jsonObj['objValue']['captureResult'])
width = jsonObj['objValue']['width']
height = jsonObj['objValue']['height']
objs = []
for i in range(n):
obj = {}
obj["data"] = bufs[i]
obj["width"] = width
obj["height"] = height
obj["format"] = fmts[i]
obj["metadata"] = mds[i]
objs.append(obj)
return objs if n>1 else objs[0]
def _run(cmd):
"""Replacement for os.system, with hiding of stdout+stderr messages.
"""
with open(os.devnull, 'wb') as devnull:
subprocess.check_call(
cmd.split(), stdout=devnull, stderr=subprocess.STDOUT)
def reboot_device(sleep_duration=30):
"""Function to reboot a device and block until it is ready.
Can be used at the start of a test to get the device into a known good
state. Will disconnect any other adb sessions, so this function is not
a part of the ItsSession class (which encapsulates a session with a
device.)
Args:
sleep_duration: (Optional) the length of time to sleep (seconds) after
the device comes online before returning; this gives the device
time to finish booting.
"""
print "Rebooting device"
_run("%s reboot" % (ItsSession.ADB));
_run("%s wait-for-device" % (ItsSession.ADB))
time.sleep(sleep_duration)
print "Reboot complete"
def reboot_device_on_argv():
"""Examine sys.argv, and reboot if the "reboot" arg is present.
If the script command line contains either:
reboot
reboot=30
then the device will be rebooted, and if the optional numeric arg is
present, then that will be the sleep duration passed to the reboot
call.
Returns:
Boolean, indicating whether the device was rebooted.
"""
for s in sys.argv[1:]:
if s[:6] == "reboot":
if len(s) > 7 and s[6] == "=":
duration = int(s[7:])
reboot_device(duration)
elif len(s) == 6:
reboot_device()
return True
return False
class __UnitTest(unittest.TestCase):
"""Run a suite of unit tests on this module.
"""
# TODO: Add some unit tests.
None
if __name__ == '__main__':
unittest.main()