#!/usr/bin/python3 # # Copyright (C) 2015 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import re import subprocess import sys import threading import time from subprocess import PIPE # class for running android device from python # it will fork the device processor class AndroidDevice(object): def __init__(self, serial): self._serial = serial def run_adb_command(self, cmd, timeout=None): adb_cmd = "adb -s %s %s" % (self._serial, cmd) print(adb_cmd) adb_process = subprocess.Popen(args=adb_cmd.split(), bufsize=-1, stderr=PIPE, stdout=PIPE) (out, err) = adb_process.communicate(timeout=timeout) return out.decode('utf-8').strip(), err.decode('utf-8').strip() def run_shell_command(self, cmd): return self.run_adb_command("shell %s" % cmd) def wait_for_device(self, timeout=30): return self.run_adb_command('wait-for-device', timeout) def wait_for_prop(self, key, value, timeout=30): boot_complete = False attempts = 0 wait_period = 1 while not boot_complete and (attempts*wait_period) < timeout: (out, err) = self.run_shell_command("getprop %s" % key) if out == value: boot_complete = True else: time.sleep(wait_period) attempts += 1 if not boot_complete: print("%s not set to %s within timeout!" % (key, value)) return boot_complete def wait_for_service(self, name, timeout=30): service_found = False attempts = 0 wait_period = 1 while not service_found and (attempts*wait_period) < timeout: (output, err) = self.run_shell_command("service check %s" % name) if 'not found' not in output: service_found = True else: time.sleep(wait_period) attempts += 1 if not service_found: print("Service '%s' not found within timeout!" % name) return service_found def wait_for_boot_complete(self, timeout=60): return self.wait_for_prop('dev.bootcomplete', '1', timeout) def install_apk(self, apk_path): self.wait_for_service('package') (out, err) = self.run_adb_command("install -r -d -g %s" % apk_path) result = out.split() return out, err, "Success" in result def uninstall_package(self, package): self.wait_for_service('package') (out, err) = self.run_adb_command("uninstall %s" % package) result = out.split() return "Success" in result def run_instrumentation_test(self, option): self.wait_for_service('activity') return self.run_shell_command("am instrument -w --no-window-animation %s" % option) def is_process_alive(self, process_name): (out, err) = self.run_shell_command("ps") names = out.split() # very lazy implementation as it does not filter out things like uid # should work mostly unless processName is too simple to overlap with # uid. So only use name like com.android.xyz return process_name in names def get_version_sdk(self): return int(self.run_shell_command("getprop ro.build.version.sdk")[0]) def get_version_codename(self): return self.run_shell_command("getprop ro.build.version.codename")[0].strip() def get_density(self): if "emulator" in self._serial: return int(self.run_shell_command("getprop qemu.sf.lcd_density")[0]) else: return int(self.run_shell_command("getprop ro.sf.lcd_density")[0]) def get_orientation(self): return int(self.run_shell_command("dumpsys | grep SurfaceOrientation")[0].split()[1]) def enumerate_android_devices(require_prefix=''): devices = subprocess.check_output(["adb", "devices"]) if not devices: return [] devices = devices.decode('UTF-8').split('\n')[1:] device_list = [] for device in devices: if device is not "" and device.startswith(require_prefix): info = device.split('\t') if info[1] == "device": device_list.append(info[0]) return device_list