普通文本  |  163行  |  6.46 KB

#!/usr/bin/python3.4
#
#   Copyright 2016 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import queue

import acts.base_test as base_test
import acts.controllers.android_device as android_device
import acts.test_utils.wifi.wifi_test_utils as wutils

from acts import asserts

ON_IDENTITY_CHANGED = "WifiNanOnIdentityChanged"
ON_MATCH = "WifiNanSessionOnMatch"
ON_MESSAGE_RX = "WifiNanSessionOnMessageReceived"
ON_MESSAGE_TX_FAIL = "WifiNanSessionOnMessageSendFail"
ON_MESSAGE_TX_OK = "WifiNanSessionOnMessageSendSuccess"

class WifiNanManagerTest(base_test.BaseTestClass):
    msg_id = 10

    def __init__(self, controllers):
        base_test.BaseTestClass.__init__(self, controllers)
        self.publisher = self.android_devices[0]
        self.subscriber = self.android_devices[1]
        self.tests = (
            "test_nan_base_test",
            )

    def setup_class(self):
        required_params = (
            "config_request1",
            "config_request2",
            "publish_data",
            "publish_settings",
            "subscribe_data",
            "subscribe_settings"
        )
        self.unpack_userparams(required_params)

    def setup_test(self):
        assert wutils.wifi_toggle_state(self.publisher, True)
        assert wutils.wifi_toggle_state(self.subscriber, True)

    def teardown_class(self):
        assert wutils.wifi_toggle_state(self.publisher, False)
        assert wutils.wifi_toggle_state(self.subscriber, False)

    def reliable_tx(self, device, peer, msg):
        num_tries = 0
        max_num_tries = 10
        events_regex = '%s|%s' % (ON_MESSAGE_TX_FAIL, ON_MESSAGE_TX_OK)
        self.msg_id = self.msg_id + 1

        while True:
            try:
                num_tries += 1
                device.droid.wifiNanSendMessage(peer, msg, self.msg_id)
                events = device.ed.pop_events(events_regex, 30)
                for event in events:
                    self.log.info('%s: %s' % (event['name'], event['data']))
                    if event['data']['messageId'] != self.msg_id:
                        continue
                    if event['name'] == ON_MESSAGE_TX_OK:
                        return True
                    if num_tries == max_num_tries:
                        self.log.info("Max number of retries reached")
                        return False
            except queue.Empty:
                self.log.info('Timed out while waiting for %s' % events_regex)
                return False

    def test_nan_base_test(self):
        """Perform NAN configuration, discovery, and message exchange.

        Configuration: 2 devices, one acting as Publisher (P) and the
        other as Subscriber (S)

        Logical steps:
          * P & S configure NAN
          * P & S wait for NAN configuration confirmation
          * P starts publishing
          * S starts subscribing
          * S waits for a match (discovery) notification
          * S sends a message to P, confirming that sent successfully
          * P waits for a message and confirms that received (uncorrupted)
          * P sends a message to S, confirming that sent successfully
          * S waits for a message and confirms that received (uncorrupted)
        """
        self.publisher.droid.wifiNanEnable(self.config_request1)
        self.subscriber.droid.wifiNanEnable(self.config_request2)

        sub2pub_msg = "How are you doing?"
        pub2sub_msg = "Doing ok - thanks!"

        try:
            event = self.publisher.ed.pop_event(ON_IDENTITY_CHANGED, 30)
            self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
        except queue.Empty:
            asserts.fail('Timed out while waiting for %s on Publisher' %
                      ON_IDENTITY_CHANGED)
        self.log.debug(event)

        try:
            event = self.subscriber.ed.pop_event(ON_IDENTITY_CHANGED, 30)
            self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
        except queue.Empty:
            asserts.fail('Timed out while waiting for %s on Subscriber' %
                      ON_IDENTITY_CHANGED)
        self.log.debug(event)

        self.publisher.droid.wifiNanPublish(self.publish_data,
                                            self.publish_settings, 0)
        self.subscriber.droid.wifiNanSubscribe(self.subscribe_data,
                                               self.subscribe_settings, 0)

        try:
            event = self.subscriber.ed.pop_event(ON_MATCH, 30)
            self.log.info('%s: %s' % (ON_MATCH, event['data']))
        except queue.Empty:
            asserts.fail('Timed out while waiting for %s on Subscriber' % ON_MATCH)
        self.log.debug(event)

        asserts.assert_true(self.reliable_tx(self.subscriber,
                                          event['data']['peerId'],
                                          sub2pub_msg),
                         "Failed to transmit from subscriber")

        try:
            event = self.publisher.ed.pop_event(ON_MESSAGE_RX, 10)
            self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
            asserts.assert_true(event['data']['messageAsString'] == sub2pub_msg,
                             "Subscriber -> publisher message corrupted")
        except queue.Empty:
            asserts.fail('Timed out while waiting for %s on publisher' %
                      ON_MESSAGE_RX)

        asserts.assert_true(self.reliable_tx(self.publisher,
                                          event['data']['peerId'],
                                          pub2sub_msg),
                         "Failed to transmit from publisher")

        try:
            event = self.subscriber.ed.pop_event(ON_MESSAGE_RX, 10)
            self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
            asserts.assert_true(event['data']['messageAsString'] == pub2sub_msg,
                             "Publisher -> subscriber message corrupted")
        except queue.Empty:
            asserts.fail('Timed out while waiting for %s on subscriber' %
                      ON_MESSAGE_RX)