#!/usr/bin/python3.4
#
# Copyright 2016 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue
import acts.base_test as base_test
import acts.controllers.android_device as android_device
import acts.test_utils.wifi.wifi_test_utils as wutils
from acts import asserts
ON_IDENTITY_CHANGED = "WifiNanOnIdentityChanged"
ON_MATCH = "WifiNanSessionOnMatch"
ON_MESSAGE_RX = "WifiNanSessionOnMessageReceived"
ON_MESSAGE_TX_FAIL = "WifiNanSessionOnMessageSendFail"
ON_MESSAGE_TX_OK = "WifiNanSessionOnMessageSendSuccess"
class WifiNanManagerTest(base_test.BaseTestClass):
msg_id = 10
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.publisher = self.android_devices[0]
self.subscriber = self.android_devices[1]
self.tests = (
"test_nan_base_test",
)
def setup_class(self):
required_params = (
"config_request1",
"config_request2",
"publish_data",
"publish_settings",
"subscribe_data",
"subscribe_settings"
)
self.unpack_userparams(required_params)
def setup_test(self):
assert wutils.wifi_toggle_state(self.publisher, True)
assert wutils.wifi_toggle_state(self.subscriber, True)
def teardown_class(self):
assert wutils.wifi_toggle_state(self.publisher, False)
assert wutils.wifi_toggle_state(self.subscriber, False)
def reliable_tx(self, device, peer, msg):
num_tries = 0
max_num_tries = 10
events_regex = '%s|%s' % (ON_MESSAGE_TX_FAIL, ON_MESSAGE_TX_OK)
self.msg_id = self.msg_id + 1
while True:
try:
num_tries += 1
device.droid.wifiNanSendMessage(peer, msg, self.msg_id)
events = device.ed.pop_events(events_regex, 30)
for event in events:
self.log.info('%s: %s' % (event['name'], event['data']))
if event['data']['messageId'] != self.msg_id:
continue
if event['name'] == ON_MESSAGE_TX_OK:
return True
if num_tries == max_num_tries:
self.log.info("Max number of retries reached")
return False
except queue.Empty:
self.log.info('Timed out while waiting for %s' % events_regex)
return False
def test_nan_base_test(self):
"""Perform NAN configuration, discovery, and message exchange.
Configuration: 2 devices, one acting as Publisher (P) and the
other as Subscriber (S)
Logical steps:
* P & S configure NAN
* P & S wait for NAN configuration confirmation
* P starts publishing
* S starts subscribing
* S waits for a match (discovery) notification
* S sends a message to P, confirming that sent successfully
* P waits for a message and confirms that received (uncorrupted)
* P sends a message to S, confirming that sent successfully
* S waits for a message and confirms that received (uncorrupted)
"""
self.publisher.droid.wifiNanEnable(self.config_request1)
self.subscriber.droid.wifiNanEnable(self.config_request2)
sub2pub_msg = "How are you doing?"
pub2sub_msg = "Doing ok - thanks!"
try:
event = self.publisher.ed.pop_event(ON_IDENTITY_CHANGED, 30)
self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
except queue.Empty:
asserts.fail('Timed out while waiting for %s on Publisher' %
ON_IDENTITY_CHANGED)
self.log.debug(event)
try:
event = self.subscriber.ed.pop_event(ON_IDENTITY_CHANGED, 30)
self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
except queue.Empty:
asserts.fail('Timed out while waiting for %s on Subscriber' %
ON_IDENTITY_CHANGED)
self.log.debug(event)
self.publisher.droid.wifiNanPublish(self.publish_data,
self.publish_settings, 0)
self.subscriber.droid.wifiNanSubscribe(self.subscribe_data,
self.subscribe_settings, 0)
try:
event = self.subscriber.ed.pop_event(ON_MATCH, 30)
self.log.info('%s: %s' % (ON_MATCH, event['data']))
except queue.Empty:
asserts.fail('Timed out while waiting for %s on Subscriber' % ON_MATCH)
self.log.debug(event)
asserts.assert_true(self.reliable_tx(self.subscriber,
event['data']['peerId'],
sub2pub_msg),
"Failed to transmit from subscriber")
try:
event = self.publisher.ed.pop_event(ON_MESSAGE_RX, 10)
self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
asserts.assert_true(event['data']['messageAsString'] == sub2pub_msg,
"Subscriber -> publisher message corrupted")
except queue.Empty:
asserts.fail('Timed out while waiting for %s on publisher' %
ON_MESSAGE_RX)
asserts.assert_true(self.reliable_tx(self.publisher,
event['data']['peerId'],
pub2sub_msg),
"Failed to transmit from publisher")
try:
event = self.subscriber.ed.pop_event(ON_MESSAGE_RX, 10)
self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
asserts.assert_true(event['data']['messageAsString'] == pub2sub_msg,
"Publisher -> subscriber message corrupted")
except queue.Empty:
asserts.fail('Timed out while waiting for %s on subscriber' %
ON_MESSAGE_RX)