普通文本  |  546行  |  19.61 KB

"""BLE tests."""

import time



from mobly import asserts
from mobly import test_runner
from mobly import utils
from utils import android_base_test

# Number of seconds for the target to stay BLE advertising.
ADVERTISING_TIME = 120
# The number of seconds to wait for receiving scan results.
SCAN_TIMEOUT = 5
# The number of seconds to wair for connection established.
CONNECTION_TIMEOUT = 20
# The number of seconds to wait before cancel connection.
CANCEL_CONNECTION_WAIT_TIME = 0.1
# UUID for test service.
TEST_BLE_SERVICE_UUID = '0000fe23-0000-1000-8000-00805f9b34fb'
# UUID for write characteristic.
TEST_WRITE_UUID = '0000e632-0000-1000-8000-00805f9b34fb'
# UUID for second write characteristic.
TEST_SECOND_WRITE_UUID = '0000e633-0000-1000-8000-00805f9b34fb'
# UUID for read test.
TEST_READ_UUID = '0000e631-0000-1000-8000-00805f9b34fb'
# UUID for second read characteristic.
TEST_SECOND_READ_UUID = '0000e634-0000-1000-8000-00805f9b34fb'
# UUID for third read characteristic.
TEST_THIRD_READ_UUID = '0000e635-0000-1000-8000-00805f9b34fb'
# UUID for scan response.
TEST_SCAN_RESPONSE_UUID = '0000e639-0000-1000-8000-00805f9b34fb'
# Advertise settings in json format for Ble Advertise.
ADVERTISE_SETTINGS = {
    'AdvertiseMode': 'ADVERTISE_MODE_LOW_LATENCY',
    'Timeout': ADVERTISING_TIME * 1000,
    'Connectable': True,
    'TxPowerLevel': 'ADVERTISE_TX_POWER_ULTRA_LOW'
}
# Ramdom data to represent device stored in advertise data.
DATA = utils.rand_ascii_str(16)
# Random data for scan response.
SCAN_RESPONSE_DATA = utils.rand_ascii_str(16)
# Random data for read operation.
READ_DATA = utils.rand_ascii_str(8)
# Random data for second read operation.
SECOND_READ_DATA = utils.rand_ascii_str(8)
# Random data for third read operation.
THIRD_READ_DATA = utils.rand_ascii_str(8)
# Random data for write operation.
WRITE_DATA = utils.rand_ascii_str(8)
# Random data for second write operation.
SECOND_WRITE_DATA = utils.rand_ascii_str(8)
# Advertise data in json format for BLE advertise.
ADVERTISE_DATA = {
    'IncludeDeviceName': False,
    'ServiceData': [{
        'UUID': TEST_BLE_SERVICE_UUID,
        'Data': DATA
    }]
}
# Advertise data in json format representing scan response for BLE advertise.
SCAN_RESPONSE = {
    'InlcudeDeviceName':
        False,
    'ServiceData': [{
        'UUID': TEST_SCAN_RESPONSE_UUID,
        'Data': SCAN_RESPONSE_DATA
    }]
}
# Scan filter in json format for BLE scan.
SCAN_FILTER = {'ServiceUuid': TEST_BLE_SERVICE_UUID}
# Scan settings in json format for BLE scan.
SCAN_SETTINGS = {'ScanMode': 'SCAN_MODE_LOW_LATENCY'}
# Characteristics for write in json format.
WRITE_CHARACTERISTIC = {
    'UUID': TEST_WRITE_UUID,
    'Property': 'PROPERTY_WRITE',
    'Permission': 'PERMISSION_WRITE'
}
SECOND_WRITE_CHARACTERISTIC = {
    'UUID': TEST_SECOND_WRITE_UUID,
    'Property': 'PROPERTY_WRITE',
    'Permission': 'PERMISSION_WRITE'
}
# Characteristics for read in json format.
READ_CHARACTERISTIC = {
    'UUID': TEST_READ_UUID,
    'Property': 'PROPERTY_READ',
    'Permission': 'PERMISSION_READ',
    'Data': READ_DATA
}
SECOND_READ_CHARACTERISTIC = {
    'UUID': TEST_SECOND_READ_UUID,
    'Property': 'PROPERTY_READ',
    'Permission': 'PERMISSION_READ',
    'Data': SECOND_READ_DATA
}
THIRD_READ_CHARACTERISTIC = {
    'UUID': TEST_THIRD_READ_UUID,
    'Property': 'PROPERTY_READ',
    'Permission': 'PERMISSION_READ',
    'Data': THIRD_READ_DATA
}
# Service data in json format for Ble Server.
SERVICE = {
    'UUID':
        TEST_BLE_SERVICE_UUID,
    'Type':
        'SERVICE_TYPE_PRIMARY',
    'Characteristics': [
        WRITE_CHARACTERISTIC, SECOND_WRITE_CHARACTERISTIC, READ_CHARACTERISTIC,
        SECOND_READ_CHARACTERISTIC, THIRD_READ_CHARACTERISTIC
    ]
}
# Macros for literal string.
UUID = 'UUID'
GATT_SUCCESS = 'GATT_SUCCESS'
STATE = 'newState'
STATUS = 'status'


def IsRequiredScanResult(scan_result):
  result = scan_result.data['result']
  for service in result['ScanRecord']['Services']:
    if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA:
      return True
  return False


def Discover(scanner, advertiser):
  """Logic for BLE scan and advertise.

  Args:
    scanner: AndroidDevice. The device that starts ble scan to find target.
    advertiser: AndroidDevice. The device that keeps advertising so other
    devices acknowledge it.

  Steps:
    1. Advertiser starts advertising and gets a startSuccess callback.
    2. Scanner starts scanning and finds advertiser from scan results.

  Verifies:
    Advertiser is discovered within 5s by scanner.
  """
  advertiser.advertise_callback = advertiser.android.bleStartAdvertising(
      ADVERTISE_SETTINGS, ADVERTISE_DATA, SCAN_RESPONSE)
  scanner.scan_callback = scanner.android.bleStartScan([SCAN_FILTER],
                                                       SCAN_SETTINGS)
  advertiser.advertise_callback.waitAndGet('onStartSuccess', 30)
  advertiser.log.info('BLE advertising started')
  time.sleep(SCAN_TIMEOUT)
  scan_result = scanner.scan_callback.waitForEvent(
      'onScanResult', IsRequiredScanResult, SCAN_TIMEOUT)
  scan_success = False
  scan_response_found = False
  result = scan_result.data['result']
  for service in result['ScanRecord']['Services']:
    if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA:
      scanner.connect_to_address = result['Device']['Address']
      scan_success = True
    if (service[UUID] == TEST_SCAN_RESPONSE_UUID and
        service['Data'] == SCAN_RESPONSE_DATA):
      scan_response_found = True
  asserts.assert_true(
      scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT)
  asserts.assert_true(scan_response_found, 'Scan response is not found')
  scanner.log.info('Advertiser is found')


def StopDiscover(scanner, advertiser):
  """Logic for stopping BLE scan and advertise.

  Args:
    scanner: AndroidDevice. The device that starts ble scan to find target.
    advertiser: AndroidDevice. The device that keeps advertising so other
    devices acknowledge it.

  Steps:
    1. Scanner stops scanning.
    2. Advertiser stops advertising.
  """
  scanner.android.bleStopScan(scanner.scan_callback.callback_id)
  scanner.log.info('BLE scanning stopped')
  advertiser.android.bleStopAdvertising(
      advertiser.advertise_callback.callback_id)
  advertiser.log.info('BLE advertising stopped')


def Connect(client, server):
  """Logic for starting BLE client and server.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.
    server: AndroidDevice. The device that behaves as GATT server.

  Steps:
    1. Server starts and service added properly.
    2. Client connects to server via Gatt, connection completes with
    GATT_SUCCESS within some TIMEOUT, onConnectionStateChange/STATE_CONNECTED
    called EXACTLY once.

  Verifies:
    Client gets corresponding callback.
  """
  server.server_callback = server.android.bleStartServer([SERVICE])
  start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30)
  asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS)
  uuids = [
      characteristic[UUID]
      for characteristic in start_server_result.data['Service'][
          'Characteristics']
  ]
  for uuid in [
      characteristic[UUID] for characteristic in SERVICE['Characteristics']
  ]:
    asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
  server.log.info('BLE server started')
  client.client_callback = client.android.bleConnectGatt(
      client.connect_to_address)
  time.sleep(CONNECTION_TIMEOUT)
  start_client_results = client.client_callback.getAll(
      'onConnectionStateChange')
  asserts.assert_equal(len(start_client_results), 1)
  for start_client_result in start_client_results:
    asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS)
    asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED')
  client.log.info('BLE client connected')


def CancelOpen(client, server):
  """Logic for BLE client cancel open and reconnect.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.
    server: AndroidDevice. The device that behaves as GATT server.

  Steps:
    1. Server starts and service added properly.
    2. Client stars to connect to server via Gatt, but the connection has not
    been established.
    3. Client calls disconnect to cancel the connection.
    4. Client connects to server, connection completes with GATT_SUCCESS within
    some TIMEOUT, onConnectionStateChange/STATE_CONNECTEDcalled EXACTLY once.

  Verifies:
    Client gets corresponding callback.
  """
  server.server_callback = server.android.bleStartServer([SERVICE])
  start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30)
  asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS)
  uuids = [
      characteristic[UUID]
      for characteristic in start_server_result.data['Service'][
          'Characteristics']
  ]
  for uuid in [
      characteristic[UUID] for characteristic in SERVICE['Characteristics']
  ]:
    asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
  server.log.info('BLE server started')
  client.client_callback = client.android.bleConnectGatt(
      client.connect_to_address)
  time.sleep(CANCEL_CONNECTION_WAIT_TIME)
  start_client_results = client.client_callback.getAll(
      'onConnectionStateChange')
  if not start_client_results:
    client.android.bleDisconnect()
    client.log.info('BLE client cancel open')
    time.sleep(CANCEL_CONNECTION_WAIT_TIME)
    client.client_callback = client.android.bleConnectGatt(
        client.connect_to_address)
    time.sleep(CONNECTION_TIMEOUT)
    start_client_results = client.client_callback.getAll(
        'onConnectionStateChange')
    asserts.assert_equal(len(start_client_results), 1)
    for start_client_result in start_client_results:
      asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS)
      asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED')
      client.log.info('BLE client connected')


def Disconnect(client, server):
  """Logic for stopping BLE client and server.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.
    server: AndroidDevice. The device that behaves as GATT server.

  Steps:
    1. Client calls disconnect, gets a callback with STATE_DISCONNECTED
    and GATT_SUCCESS.
    2. Server closes.

  Verifies:
    Client gets corresponding callback.
  """
  client.android.bleDisconnect()
  stop_client_result = client.client_callback.waitAndGet(
      'onConnectionStateChange', 30)
  asserts.assert_equal(stop_client_result.data[STATUS], GATT_SUCCESS)
  asserts.assert_equal(stop_client_result.data[STATE], 'STATE_DISCONNECTED')
  client.log.info('BLE client disconnected')
  server.android.bleStopServer()
  server.log.info('BLE server stopped')


def DiscoverServices(client):
  """Logic for BLE services discovery.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.

  Steps:
    1. Client successfully completes service discovery & gets
    onServicesDiscovered callback within some TIMEOUT, onServicesDiscovered/
    GATT_SUCCESS called EXACTLY once.
    2. Client discovers the readable and writable characteristics.

  Verifies:
    Client gets corresponding callback.
  """
  client.android.bleDiscoverServices()
  time.sleep(CONNECTION_TIMEOUT)
  discover_services_results = client.client_callback.getAll(
      'onServiceDiscovered')
  asserts.assert_equal(len(discover_services_results), 1)
  service_discovered = False
  asserts.assert_equal(discover_services_results[0].data[STATUS],
                       GATT_SUCCESS)
  for service in discover_services_results[0].data['Services']:
    if service['UUID'] == TEST_BLE_SERVICE_UUID:
      service_discovered = True
      uuids = [
          characteristic[UUID]
          for characteristic in service['Characteristics']
      ]
      for uuid in [
          characteristic[UUID]
          for characteristic in SERVICE['Characteristics']
      ]:
        asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
  asserts.assert_true(service_discovered,
                      'Failed to discover the customize service')
  client.log.info('BLE discover services finished')


def ReadCharacteristic(client):
  """Logic for BLE characteristic read.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.

  Steps:
    1. Client reads a characteristic from server & gets true.
    2. Server calls sendResponse & client gets onCharacteristicRead.

  Verifies:
    Client gets corresponding callback.
  """
  read_operation_result = client.android.bleReadOperation(
      TEST_BLE_SERVICE_UUID, TEST_READ_UUID)
  asserts.assert_true(read_operation_result,
                      'BLE read operation failed to start')
  read_operation_result = client.client_callback.waitAndGet(
      'onCharacteristicRead', 30)
  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
  asserts.assert_equal(read_operation_result.data['Data'], READ_DATA)
  client.log.info('Read operation finished')
  read_operation_result = client.android.bleReadOperation(
      TEST_BLE_SERVICE_UUID, TEST_SECOND_READ_UUID)
  asserts.assert_true(read_operation_result,
                      'BLE read operation failed to start')
  read_operation_result = client.client_callback.waitAndGet(
      'onCharacteristicRead', 30)
  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
  asserts.assert_equal(read_operation_result.data['Data'], SECOND_READ_DATA)
  client.log.info('Second read operation finished')
  read_operation_result = client.android.bleReadOperation(
      TEST_BLE_SERVICE_UUID, TEST_THIRD_READ_UUID)
  asserts.assert_true(read_operation_result,
                      'BLE read operation failed to start')
  read_operation_result = client.client_callback.waitAndGet(
      'onCharacteristicRead', 30)
  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
  asserts.assert_equal(read_operation_result.data['Data'], THIRD_READ_DATA)
  client.log.info('Third read operation finished')


def WriteCharacteristic(client, server):
  """Logic for BLE characteristic write.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.
    server: AndroidDevice. The device that behaves as GATT server.

  Steps:
    1. Client writes a characteristic to server & gets true.
    2. Server calls sendResponse & client gets onCharacteristicWrite.

  Verifies:
    Client gets corresponding callback.
  """
  write_operation_result = client.android.bleWriteOperation(
      TEST_BLE_SERVICE_UUID, TEST_WRITE_UUID, WRITE_DATA)
  asserts.assert_true(write_operation_result,
                      'BLE write operation failed to start')
  server_write_operation_result = server.server_callback.waitAndGet(
      'onCharacteristicWriteRequest', 30)
  asserts.assert_equal(server_write_operation_result.data['Data'], WRITE_DATA)
  client.client_callback.waitAndGet('onCharacteristicWrite', 30)
  client.log.info('Write operation finished')
  write_operation_result = client.android.bleWriteOperation(
      TEST_BLE_SERVICE_UUID, TEST_SECOND_WRITE_UUID, SECOND_WRITE_DATA)
  asserts.assert_true(write_operation_result,
                      'BLE write operation failed to start')
  server_write_operation_result = server.server_callback.waitAndGet(
      'onCharacteristicWriteRequest', 30)
  asserts.assert_equal(server_write_operation_result.data['Data'],
                       SECOND_WRITE_DATA)
  client.client_callback.waitAndGet('onCharacteristicWrite', 30)
  client.log.info('Second write operation finished')


def ReliableWrite(client, server):
  """Logic for BLE reliable write.

  Args:
    client: AndroidDevice. The device that behaves as GATT client.
    server: AndroidDevice. The device that behaves as GATT server.

  Steps:
    1. Client calls beginReliableWrite & gets true.
    2. Client writes a characteristic to server & gets true.
    3. Server calls sendResponse & client gets onCharacteristicWrite.
    4. Client calls executeReliableWrite & gets true
    5. Server calls sendResponse & client gets onReliableWriteCompleted.

  Verifies:
    Client get corresponding callbacks.
  """
  begin_reliable_write_result = client.android.bleBeginReliableWrite()
  asserts.assert_true(begin_reliable_write_result,
                      'BLE reliable write failed to start')
  client.log.info('BLE reliable write started')
  WriteCharacteristic(client, server)
  execute_reliable_write_result = client.android.bleExecuteReliableWrite()
  asserts.assert_true(execute_reliable_write_result,
                      'BLE reliable write failed to execute')
  client.log.info('BLE reliable write execute started')
  client.client_callback.waitAndGet('onReliableWriteCompleted', 30)
  client.log.info('BLE reliable write finished')


class BleTest(android_base_test.AndroidBaseTest):
  """BLE tests."""

  def setup_class(self):
    super(BleTest, self).setup_class()
    self.initiator = self.dut_a
    self.receiver = self.dut_b
    # Sets the tag that represents this device in logs.
    # The device used to scan BLE devices and behave as a GATT client.
    self.initiator.debug_tag = 'initiator'
    # The device used to BLE advertise and behave as a GATT server.
    self.receiver.debug_tag = 'receiver'

  def setup_test(self):
    # Make sure bluetooth is on.
    self.initiator.android.btEnable()
    self.receiver.android.btEnable()

  def test_basic_ble_process(self):
    """Test for basic BLE process flow.

    Steps:
      1. Initiator discovers receiver.
      2. Initiator connects to receiver.
      3. Initiator discovers the BLE service receiver provided.
      4. Initiator reads a message from receiver.
      5. Initiator sends a message to receiver.
      6. Initiator disconnects from receiver.
      7. BLE scan and advertise stopped.

    Verifies:
      In each step, initiator and receiver get corresponding callbacks.
    """
    Discover(self.initiator, self.receiver)
    Connect(self.initiator, self.receiver)
    DiscoverServices(self.initiator)
    ReadCharacteristic(self.initiator)
    WriteCharacteristic(self.initiator, self.receiver)
    Disconnect(self.initiator, self.receiver)
    StopDiscover(self.initiator, self.receiver)

  def test_cancel_open_ble_process(self):
    """Test for BLE process flow involving cancel open.

    Steps:
      1. Initiator discovers receiver.
      2. Initiator initials connection to receiver, cancels connection and
      reconnects.
      3. Initiator discovers the BLE service receiver provided.
      6. Initiator disconnects from receiver.
      7. BLE scan and advertise stopped.

    Verifies:
      In each step, initiator and receiver get corresponding callbacks.
    """
    Discover(self.initiator, self.receiver)
    CancelOpen(self.initiator, self.receiver)
    DiscoverServices(self.initiator)
    Disconnect(self.initiator, self.receiver)
    StopDiscover(self.initiator, self.receiver)

  def test_reliable_write_ble_process(self):
    """Test for BLE process flow involving reliable write.

    Steps:
      1. Initiator discovers receiver.
      2. Initiator connects to receiver.
      3. Initiator discovers the BLE service receiver provided.
      4. Initiator starts reliable write to receiver and finishes successfully.
      5. Initiator disconnects from receiver.
      6. BLE scan and advertise stopped.

    Verifies:
      In each step, initiator and receiver get corresponding callbacks.
    """
    Discover(self.initiator, self.receiver)
    Connect(self.initiator, self.receiver)
    DiscoverServices(self.initiator)
    ReliableWrite(self.initiator, self.receiver)
    Disconnect(self.initiator, self.receiver)
    StopDiscover(self.initiator, self.receiver)

  def teardown_test(self):
    # Turn Bluetooth off on both devices after test finishes.
    self.initiator.android.btDisable()
    self.receiver.android.btDisable()


if __name__ == '__main__':
  test_runner.main()