普通文本  |  615行  |  21.28 KB

#!/usr/bin/env python
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""This is a python sync server used for testing Chrome Sync.

By default, it listens on an ephemeral port and xmpp_port and sends the port
numbers back to the originating process over a pipe. The originating process can
specify an explicit port and xmpp_port if necessary.
"""

import asyncore
import BaseHTTPServer
import errno
import os
import select
import socket
import sys
import urlparse

import chromiumsync
import echo_message
import testserver_base
import xmppserver


class SyncHTTPServer(testserver_base.ClientRestrictingServerMixIn,
                     testserver_base.BrokenPipeHandlerMixIn,
                     testserver_base.StoppableHTTPServer):
  """An HTTP server that handles sync commands."""

  def __init__(self, server_address, xmpp_port, request_handler_class):
    testserver_base.StoppableHTTPServer.__init__(self,
                                                 server_address,
                                                 request_handler_class)
    self._sync_handler = chromiumsync.TestServer()
    self._xmpp_socket_map = {}
    self._xmpp_server = xmppserver.XmppServer(
      self._xmpp_socket_map, ('localhost', xmpp_port))
    self.xmpp_port = self._xmpp_server.getsockname()[1]
    self.authenticated = True

  def GetXmppServer(self):
    return self._xmpp_server

  def HandleCommand(self, query, raw_request):
    return self._sync_handler.HandleCommand(query, raw_request)

  def HandleRequestNoBlock(self):
    """Handles a single request.

    Copied from SocketServer._handle_request_noblock().
    """

    try:
      request, client_address = self.get_request()
    except socket.error:
      return
    if self.verify_request(request, client_address):
      try:
        self.process_request(request, client_address)
      except Exception:
        self.handle_error(request, client_address)
        self.close_request(request)

  def SetAuthenticated(self, auth_valid):
    self.authenticated = auth_valid

  def GetAuthenticated(self):
    return self.authenticated

  def serve_forever(self):
    """This is a merge of asyncore.loop() and SocketServer.serve_forever().
    """

    def HandleXmppSocket(fd, socket_map, handler):
      """Runs the handler for the xmpp connection for fd.

      Adapted from asyncore.read() et al.
      """

      xmpp_connection = socket_map.get(fd)
      # This could happen if a previous handler call caused fd to get
      # removed from socket_map.
      if xmpp_connection is None:
        return
      try:
        handler(xmpp_connection)
      except (asyncore.ExitNow, KeyboardInterrupt, SystemExit):
        raise
      except:
        xmpp_connection.handle_error()

    while True:
      read_fds = [ self.fileno() ]
      write_fds = []
      exceptional_fds = []

      for fd, xmpp_connection in self._xmpp_socket_map.items():
        is_r = xmpp_connection.readable()
        is_w = xmpp_connection.writable()
        if is_r:
          read_fds.append(fd)
        if is_w:
          write_fds.append(fd)
        if is_r or is_w:
          exceptional_fds.append(fd)

      try:
        read_fds, write_fds, exceptional_fds = (
          select.select(read_fds, write_fds, exceptional_fds))
      except select.error, err:
        if err.args[0] != errno.EINTR:
          raise
        else:
          continue

      for fd in read_fds:
        if fd == self.fileno():
          self.HandleRequestNoBlock()
          continue
        HandleXmppSocket(fd, self._xmpp_socket_map,
                         asyncore.dispatcher.handle_read_event)

      for fd in write_fds:
        HandleXmppSocket(fd, self._xmpp_socket_map,
                         asyncore.dispatcher.handle_write_event)

      for fd in exceptional_fds:
        HandleXmppSocket(fd, self._xmpp_socket_map,
                         asyncore.dispatcher.handle_expt_event)


class SyncPageHandler(testserver_base.BasePageHandler):
  """Handler for the main HTTP sync server."""

  def __init__(self, request, client_address, sync_http_server):
    get_handlers = [self.ChromiumSyncTimeHandler,
                    self.ChromiumSyncMigrationOpHandler,
                    self.ChromiumSyncCredHandler,
                    self.ChromiumSyncXmppCredHandler,
                    self.ChromiumSyncDisableNotificationsOpHandler,
                    self.ChromiumSyncEnableNotificationsOpHandler,
                    self.ChromiumSyncSendNotificationOpHandler,
                    self.ChromiumSyncBirthdayErrorOpHandler,
                    self.ChromiumSyncTransientErrorOpHandler,
                    self.ChromiumSyncErrorOpHandler,
                    self.ChromiumSyncSyncTabFaviconsOpHandler,
                    self.ChromiumSyncCreateSyncedBookmarksOpHandler,
                    self.ChromiumSyncEnableKeystoreEncryptionOpHandler,
                    self.ChromiumSyncRotateKeystoreKeysOpHandler,
                    self.ChromiumSyncEnableManagedUserAcknowledgementHandler,
                    self.ChromiumSyncEnablePreCommitGetUpdateAvoidanceHandler,
                    self.GaiaOAuth2TokenHandler,
                    self.GaiaSetOAuth2TokenResponseHandler,
                    self.TriggerSyncedNotificationHandler,
                    self.SyncedNotificationsPageHandler,
                    self.CustomizeClientCommandHandler]

    post_handlers = [self.ChromiumSyncCommandHandler,
                     self.ChromiumSyncTimeHandler,
                     self.GaiaOAuth2TokenHandler,
                     self.GaiaSetOAuth2TokenResponseHandler]
    testserver_base.BasePageHandler.__init__(self, request, client_address,
                                             sync_http_server, [], get_handlers,
                                             [], post_handlers, [])


  def ChromiumSyncTimeHandler(self):
    """Handle Chromium sync .../time requests.

    The syncer sometimes checks server reachability by examining /time.
    """

    test_name = "/chromiumsync/time"
    if not self._ShouldHandleRequest(test_name):
      return False

    # Chrome hates it if we send a response before reading the request.
    if self.headers.getheader('content-length'):
      length = int(self.headers.getheader('content-length'))
      _raw_request = self.rfile.read(length)

    self.send_response(200)
    self.send_header('Content-Type', 'text/plain')
    self.end_headers()
    self.wfile.write('0123456789')
    return True

  def ChromiumSyncCommandHandler(self):
    """Handle a chromiumsync command arriving via http.

    This covers all sync protocol commands: authentication, getupdates, and
    commit.
    """

    test_name = "/chromiumsync/command"
    if not self._ShouldHandleRequest(test_name):
      return False

    length = int(self.headers.getheader('content-length'))
    raw_request = self.rfile.read(length)
    http_response = 200
    raw_reply = None
    if not self.server.GetAuthenticated():
      http_response = 401
      challenge = 'GoogleLogin realm="http://%s", service="chromiumsync"' % (
        self.server.server_address[0])
    else:
      http_response, raw_reply = self.server.HandleCommand(
          self.path, raw_request)

    ### Now send the response to the client. ###
    self.send_response(http_response)
    if http_response == 401:
      self.send_header('www-Authenticate', challenge)
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncMigrationOpHandler(self):
    test_name = "/chromiumsync/migrate"
    if not self._ShouldHandleRequest(test_name):
      return False

    http_response, raw_reply = self.server._sync_handler.HandleMigrate(
        self.path)
    self.send_response(http_response)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncCredHandler(self):
    test_name = "/chromiumsync/cred"
    if not self._ShouldHandleRequest(test_name):
      return False
    try:
      query = urlparse.urlparse(self.path)[4]
      cred_valid = urlparse.parse_qs(query)['valid']
      if cred_valid[0] == 'True':
        self.server.SetAuthenticated(True)
      else:
        self.server.SetAuthenticated(False)
    except Exception:
      self.server.SetAuthenticated(False)

    http_response = 200
    raw_reply = 'Authenticated: %s ' % self.server.GetAuthenticated()
    self.send_response(http_response)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncXmppCredHandler(self):
    test_name = "/chromiumsync/xmppcred"
    if not self._ShouldHandleRequest(test_name):
      return False
    xmpp_server = self.server.GetXmppServer()
    try:
      query = urlparse.urlparse(self.path)[4]
      cred_valid = urlparse.parse_qs(query)['valid']
      if cred_valid[0] == 'True':
        xmpp_server.SetAuthenticated(True)
      else:
        xmpp_server.SetAuthenticated(False)
    except:
      xmpp_server.SetAuthenticated(False)

    http_response = 200
    raw_reply = 'XMPP Authenticated: %s ' % xmpp_server.GetAuthenticated()
    self.send_response(http_response)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncDisableNotificationsOpHandler(self):
    test_name = "/chromiumsync/disablenotifications"
    if not self._ShouldHandleRequest(test_name):
      return False
    self.server.GetXmppServer().DisableNotifications()
    result = 200
    raw_reply = ('<html><title>Notifications disabled</title>'
                 '<H1>Notifications disabled</H1></html>')
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncEnableNotificationsOpHandler(self):
    test_name = "/chromiumsync/enablenotifications"
    if not self._ShouldHandleRequest(test_name):
      return False
    self.server.GetXmppServer().EnableNotifications()
    result = 200
    raw_reply = ('<html><title>Notifications enabled</title>'
                 '<H1>Notifications enabled</H1></html>')
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncSendNotificationOpHandler(self):
    test_name = "/chromiumsync/sendnotification"
    if not self._ShouldHandleRequest(test_name):
      return False
    query = urlparse.urlparse(self.path)[4]
    query_params = urlparse.parse_qs(query)
    channel = ''
    data = ''
    if 'channel' in query_params:
      channel = query_params['channel'][0]
    if 'data' in query_params:
      data = query_params['data'][0]
    self.server.GetXmppServer().SendNotification(channel, data)
    result = 200
    raw_reply = ('<html><title>Notification sent</title>'
                 '<H1>Notification sent with channel "%s" '
                 'and data "%s"</H1></html>'
                 % (channel, data))
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncBirthdayErrorOpHandler(self):
    test_name = "/chromiumsync/birthdayerror"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = self.server._sync_handler.HandleCreateBirthdayError()
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncTransientErrorOpHandler(self):
    test_name = "/chromiumsync/transienterror"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = self.server._sync_handler.HandleSetTransientError()
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncErrorOpHandler(self):
    test_name = "/chromiumsync/error"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = self.server._sync_handler.HandleSetInducedError(
        self.path)
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncSyncTabFaviconsOpHandler(self):
    test_name = "/chromiumsync/synctabfavicons"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = self.server._sync_handler.HandleSetSyncTabFavicons()
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncCreateSyncedBookmarksOpHandler(self):
    test_name = "/chromiumsync/createsyncedbookmarks"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = self.server._sync_handler.HandleCreateSyncedBookmarks()
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncEnableKeystoreEncryptionOpHandler(self):
    test_name = "/chromiumsync/enablekeystoreencryption"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = (
        self.server._sync_handler.HandleEnableKeystoreEncryption())
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncRotateKeystoreKeysOpHandler(self):
    test_name = "/chromiumsync/rotatekeystorekeys"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = (
        self.server._sync_handler.HandleRotateKeystoreKeys())
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncEnableManagedUserAcknowledgementHandler(self):
    test_name = "/chromiumsync/enablemanageduseracknowledgement"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = (
        self.server._sync_handler.HandleEnableManagedUserAcknowledgement())
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def ChromiumSyncEnablePreCommitGetUpdateAvoidanceHandler(self):
    test_name = "/chromiumsync/enableprecommitgetupdateavoidance"
    if not self._ShouldHandleRequest(test_name):
      return False
    result, raw_reply = (
        self.server._sync_handler.HandleEnablePreCommitGetUpdateAvoidance())
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def GaiaOAuth2TokenHandler(self):
    test_name = "/o/oauth2/token"
    if not self._ShouldHandleRequest(test_name):
      return False
    if self.headers.getheader('content-length'):
      length = int(self.headers.getheader('content-length'))
      _raw_request = self.rfile.read(length)
    result, raw_reply = (
        self.server._sync_handler.HandleGetOauth2Token())
    self.send_response(result)
    self.send_header('Content-Type', 'application/json')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def GaiaSetOAuth2TokenResponseHandler(self):
    test_name = "/setfakeoauth2token"
    if not self._ShouldHandleRequest(test_name):
      return False

    # The index of 'query' is 4.
    # See http://docs.python.org/2/library/urlparse.html
    query = urlparse.urlparse(self.path)[4]
    query_params = urlparse.parse_qs(query)

    response_code = 0
    request_token = ''
    access_token = ''
    expires_in = 0
    token_type = ''

    if 'response_code' in query_params:
      response_code = query_params['response_code'][0]
    if 'request_token' in query_params:
      request_token = query_params['request_token'][0]
    if 'access_token' in query_params:
      access_token = query_params['access_token'][0]
    if 'expires_in' in query_params:
      expires_in = query_params['expires_in'][0]
    if 'token_type' in query_params:
      token_type = query_params['token_type'][0]

    result, raw_reply = (
        self.server._sync_handler.HandleSetOauth2Token(
            response_code, request_token, access_token, expires_in, token_type))
    self.send_response(result)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(raw_reply))
    self.end_headers()
    self.wfile.write(raw_reply)
    return True

  def TriggerSyncedNotificationHandler(self):
    test_name = "/triggersyncednotification"
    if not self._ShouldHandleRequest(test_name):
      return False

    query = urlparse.urlparse(self.path)[4]
    query_params = urlparse.parse_qs(query)

    serialized_notification = ''

    if 'serialized_notification' in query_params:
      serialized_notification = query_params['serialized_notification'][0]

    try:
      notification_string = self.server._sync_handler.account \
          .AddSyncedNotification(serialized_notification)
      reply = "A synced notification was triggered:\n\n"
      reply += "<code>{}</code>.".format(notification_string)
      response_code = 200
    except chromiumsync.ClientNotConnectedError:
      reply = ('The client is not connected to the server, so the notification'
               ' could not be created.')
      response_code = 400

    self.send_response(response_code)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(reply))
    self.end_headers()
    self.wfile.write(reply)
    return True

  def CustomizeClientCommandHandler(self):
    test_name = "/customizeclientcommand"
    if not self._ShouldHandleRequest(test_name):
      return False

    query = urlparse.urlparse(self.path)[4]
    query_params = urlparse.parse_qs(query)

    if 'sessions_commit_delay_seconds' in query_params:
      sessions_commit_delay = query_params['sessions_commit_delay_seconds'][0]
      try:
        command_string = self.server._sync_handler.CustomizeClientCommand(
            int(sessions_commit_delay))
        response_code = 200
        reply = "The ClientCommand was customized:\n\n"
        reply += "<code>{}</code>.".format(command_string)
      except ValueError:
        response_code = 400
        reply = "sessions_commit_delay_seconds was not an int"
    else:
      response_code = 400
      reply = "sessions_commit_delay_seconds is required"

    self.send_response(response_code)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(reply))
    self.end_headers()
    self.wfile.write(reply)
    return True

  def SyncedNotificationsPageHandler(self):
    test_name = "/syncednotifications"
    if not self._ShouldHandleRequest(test_name):
      return False

    html = open('sync/tools/testserver/synced_notifications.html', 'r').read()

    self.send_response(200)
    self.send_header('Content-Type', 'text/html')
    self.send_header('Content-Length', len(html))
    self.end_headers()
    self.wfile.write(html)
    return True


class SyncServerRunner(testserver_base.TestServerRunner):
  """TestServerRunner for the net test servers."""

  def __init__(self):
    super(SyncServerRunner, self).__init__()

  def create_server(self, server_data):
    port = self.options.port
    host = self.options.host
    xmpp_port = self.options.xmpp_port
    server = SyncHTTPServer((host, port), xmpp_port, SyncPageHandler)
    print ('Sync HTTP server started at %s:%d/chromiumsync...' %
           (host, server.server_port))
    print ('Fake OAuth2 Token server started at %s:%d/o/oauth2/token...' %
           (host, server.server_port))
    print ('Sync XMPP server started at %s:%d...' %
           (host, server.xmpp_port))
    server_data['port'] = server.server_port
    server_data['xmpp_port'] = server.xmpp_port
    return server

  def run_server(self):
    testserver_base.TestServerRunner.run_server(self)

  def add_options(self):
    testserver_base.TestServerRunner.add_options(self)
    self.option_parser.add_option('--xmpp-port', default='0', type='int',
                                  help='Port used by the XMPP server. If '
                                  'unspecified, the XMPP server will listen on '
                                  'an ephemeral port.')
    # Override the default logfile name used in testserver.py.
    self.option_parser.set_defaults(log_file='sync_testserver.log')

if __name__ == '__main__':
  sys.exit(SyncServerRunner().main())