普通文本  |  273行  |  8.96 KB

# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Chrome OS Parnter Concole remote actions."""

from __future__ import print_function

import base64
import logging

import common

from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import utils
from autotest_lib.server.hosts import moblab_host
from autotest_lib.site_utils import pubsub_utils
from autotest_lib.site_utils import cloud_console_pb2 as cpcon


_PUBSUB_TOPIC = global_config.global_config.get_config_value(
        'CROS', 'cloud_notification_topic', default=None)

# Current notification version.
CURRENT_MESSAGE_VERSION = '1'

# Test upload pubsub notification attributes
LEGACY_ATTR_VERSION = 'version'
LEGACY_ATTR_GCS_URI = 'gcs_uri'
LEGACY_ATTR_MOBLAB_MAC = 'moblab_mac_address'
LEGACY_ATTR_MOBLAB_ID = 'moblab_id'
# the message data for new test result notification.
LEGACY_TEST_OFFLOAD_MESSAGE = 'NEW_TEST_RESULT'


def is_cloud_notification_enabled():
    """Checks if cloud pubsub notification is enabled.

    @returns: True if cloud pubsub notification is enabled. Otherwise, False.
    """
    return  global_config.global_config.get_config_value(
        'CROS', 'cloud_notification_enabled', type=bool, default=False)


def _get_message_type_name(message_type_enum):
    """Gets the message type name from message type enum.

    @param message_type_enum: The message type enum.

    @return The corresponding message type name as string, or 'MSG_UNKNOWN'.
    """
    return cpcon.MessageType.Name(message_type_enum)


def _get_attribute_name(attribute_enum):
    """Gets the message attribute name from attribte enum.

    @param attribute_enum: The attribute enum.

    @return The corresponding attribute name as string, or 'ATTR_INVALID'.
    """
    return cpcon.MessageAttribute.Name(attribute_enum)


class CloudConsoleClient(object):
    """The remote interface to the Cloud Console."""
    def send_heartbeat(self):
        """Sends a heartbeat.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass

    def send_event(self, event_type=None, event_data=None):
        """Sends an event notification to the remote console.

        @param event_type: The event type that is defined in the protobuffer
            file 'cloud_console.proto'.
        @param event_data: The event data.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass

    def send_log(self, msg, level=None, session_id=None):
        """Sends a log message to the remote console.

        @param msg: The log message.
        @param level: The logging level.
        @param session_id: The current session id.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass

    def send_alert(self, msg, level=None, session_id=None):
        """Sends an alert to the remote console.

        @param msg: The alert message.
        @param level: The logging level.
        @param session_id: The current session id.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass

    def send_test_job_offloaded_message(self, gcs_uri):
        """Sends a test job offloaded message to the remote console.

        @param gcs_uri: The test result Google Cloud Storage URI.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass


# Make it easy to mock out
def _create_pubsub_client(credential):
    return pubsub_utils.PubSubClient(credential)


class PubSubBasedClient(CloudConsoleClient):
    """A Cloud PubSub based implementation of the CloudConsoleClient interface.
    """
    def __init__(
            self,
            credential=moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION,
            pubsub_topic=_PUBSUB_TOPIC):
        """Constructor.

        @param credential: The service account credential filename. Default to
            '/home/moblab/.service_account.json'.
        @param pubsub_topic: The cloud pubsub topic name to use.
        """
        super(PubSubBasedClient, self).__init__()
        self._pubsub_client = _create_pubsub_client(credential)
        self._pubsub_topic = pubsub_topic


    def _create_message(self, data, msg_attributes):
        """Creates a cloud pubsub notification object.

        @param data: The message data as a string.
        @param msg_attributes: The message attribute map.

        @returns: A pubsub message object with data and attributes.
        """
        message = {}
        if data:
            message['data'] = data
        if msg_attributes:
            message['attributes'] = msg_attributes
        return message

    def _create_message_attributes(self, message_type_enum):
        """Creates a cloud pubsub notification message attribute map.

        Fills in the version, moblab mac address, and moblab id information
        as attributes.

        @param message_type_enum The message type enum.

        @returns: A pubsub messsage attribute map.
        """
        msg_attributes = {}
        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (
                _get_message_type_name(message_type_enum))
        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (
                CURRENT_MESSAGE_VERSION)
        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (
                utils.get_moblab_serial_number())
        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (
                utils.get_moblab_id())
        return msg_attributes

    def _create_test_job_offloaded_message(self, gcs_uri):
        """Construct a test result notification.

        TODO(ntang): switch LEGACY to new message format.
        @param gcs_uri: The test result Google Cloud Storage URI.

        @returns The notification message.
        """
        data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)
        msg_attributes = {}
        msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION
        msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (
                utils.get_moblab_serial_number())
        msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()
        msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri

        return self._create_message(data, msg_attributes)


    def send_test_job_offloaded_message(self, gcs_uri):
        """Notify the cloud console a test job is offloaded.

        @param gcs_uri: The test result Google Cloud Storage URI.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        logging.info('Notification on gcs_uri %s', gcs_uri)
        message = self._create_test_job_offloaded_message(gcs_uri)
        return self._publish_notification(message)


    def _publish_notification(self, message):
        msg_ids = self._pubsub_client.publish_notifications(
                self._pubsub_topic, [message])

        if msg_ids:
            logging.debug('Successfully sent out a notification')
            return True
        logging.warning('Failed to send notification %s', str(message))
        return False

    def send_heartbeat(self):
        """Sends a heartbeat.

        @returns True if the heartbeat notification is successfully sent.
            Otherwise, False.
        """
        logging.info('Sending a heartbeat')

        event = cpcon.Heartbeat()
        # Don't sent local timestamp for now.
        data = event.SerializeToString()
        try:
            attributes = self._create_message_attributes(
                    cpcon.MSG_MOBLAB_HEARTBEAT)
            message = self._create_message(data, attributes)
        except ValueError:
            logging.exception('Failed to create message.')
            return False
        return self._publish_notification(message)

    def send_event(self, event_type=None, event_data=None):
        """Sends an event notification to the remote console.

        @param event_type: The event type that is defined in the protobuffer
            file 'cloud_console.proto'.
        @param event_data: The event data.

        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        logging.info('Send an event.')
        if not event_type:
            logging.info('Failed to send event without a type.')
            return False

        event = cpcon.RemoteEventMessage()
        if event_data:
            event.data = event_data
        else:
            event.data = ''
        event.type = event_type
        data = event.SerializeToString()
        try:
            attributes = self._create_message_attributes(
                    cpcon.MSG_MOBLAB_REMOTE_EVENT)
            message = self._create_message(data, attributes)
        except ValueError:
            logging.exception('Failed to create message.')
            return False
        return self._publish_notification(message)