#!/usr/bin/env python # Copyright 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This is a python sync server used for testing Chrome Sync. By default, it listens on an ephemeral port and xmpp_port and sends the port numbers back to the originating process over a pipe. The originating process can specify an explicit port and xmpp_port if necessary. """ import asyncore import BaseHTTPServer import errno import os import select import socket import sys import urlparse import chromiumsync import echo_message import testserver_base import xmppserver class SyncHTTPServer(testserver_base.ClientRestrictingServerMixIn, testserver_base.BrokenPipeHandlerMixIn, testserver_base.StoppableHTTPServer): """An HTTP server that handles sync commands.""" def __init__(self, server_address, xmpp_port, request_handler_class): testserver_base.StoppableHTTPServer.__init__(self, server_address, request_handler_class) self._sync_handler = chromiumsync.TestServer() self._xmpp_socket_map = {} self._xmpp_server = xmppserver.XmppServer( self._xmpp_socket_map, ('localhost', xmpp_port)) self.xmpp_port = self._xmpp_server.getsockname()[1] self.authenticated = True def GetXmppServer(self): return self._xmpp_server def HandleCommand(self, query, raw_request): return self._sync_handler.HandleCommand(query, raw_request) def HandleRequestNoBlock(self): """Handles a single request. Copied from SocketServer._handle_request_noblock(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except Exception: self.handle_error(request, client_address) self.close_request(request) def SetAuthenticated(self, auth_valid): self.authenticated = auth_valid def GetAuthenticated(self): return self.authenticated def serve_forever(self): """This is a merge of asyncore.loop() and SocketServer.serve_forever(). """ def HandleXmppSocket(fd, socket_map, handler): """Runs the handler for the xmpp connection for fd. Adapted from asyncore.read() et al. """ xmpp_connection = socket_map.get(fd) # This could happen if a previous handler call caused fd to get # removed from socket_map. if xmpp_connection is None: return try: handler(xmpp_connection) except (asyncore.ExitNow, KeyboardInterrupt, SystemExit): raise except: xmpp_connection.handle_error() while True: read_fds = [ self.fileno() ] write_fds = [] exceptional_fds = [] for fd, xmpp_connection in self._xmpp_socket_map.items(): is_r = xmpp_connection.readable() is_w = xmpp_connection.writable() if is_r: read_fds.append(fd) if is_w: write_fds.append(fd) if is_r or is_w: exceptional_fds.append(fd) try: read_fds, write_fds, exceptional_fds = ( select.select(read_fds, write_fds, exceptional_fds)) except select.error, err: if err.args[0] != errno.EINTR: raise else: continue for fd in read_fds: if fd == self.fileno(): self.HandleRequestNoBlock() continue HandleXmppSocket(fd, self._xmpp_socket_map, asyncore.dispatcher.handle_read_event) for fd in write_fds: HandleXmppSocket(fd, self._xmpp_socket_map, asyncore.dispatcher.handle_write_event) for fd in exceptional_fds: HandleXmppSocket(fd, self._xmpp_socket_map, asyncore.dispatcher.handle_expt_event) class SyncPageHandler(testserver_base.BasePageHandler): """Handler for the main HTTP sync server.""" def __init__(self, request, client_address, sync_http_server): get_handlers = [self.ChromiumSyncTimeHandler, self.ChromiumSyncMigrationOpHandler, self.ChromiumSyncCredHandler, self.ChromiumSyncXmppCredHandler, self.ChromiumSyncDisableNotificationsOpHandler, self.ChromiumSyncEnableNotificationsOpHandler, self.ChromiumSyncSendNotificationOpHandler, self.ChromiumSyncBirthdayErrorOpHandler, self.ChromiumSyncTransientErrorOpHandler, self.ChromiumSyncErrorOpHandler, self.ChromiumSyncSyncTabFaviconsOpHandler, self.ChromiumSyncCreateSyncedBookmarksOpHandler, self.ChromiumSyncEnableKeystoreEncryptionOpHandler, self.ChromiumSyncRotateKeystoreKeysOpHandler, self.ChromiumSyncEnableManagedUserAcknowledgementHandler, self.ChromiumSyncEnablePreCommitGetUpdateAvoidanceHandler, self.GaiaOAuth2TokenHandler, self.GaiaSetOAuth2TokenResponseHandler, self.TriggerSyncedNotificationHandler, self.SyncedNotificationsPageHandler, self.CustomizeClientCommandHandler] post_handlers = [self.ChromiumSyncCommandHandler, self.ChromiumSyncTimeHandler, self.GaiaOAuth2TokenHandler, self.GaiaSetOAuth2TokenResponseHandler] testserver_base.BasePageHandler.__init__(self, request, client_address, sync_http_server, [], get_handlers, [], post_handlers, []) def ChromiumSyncTimeHandler(self): """Handle Chromium sync .../time requests. The syncer sometimes checks server reachability by examining /time. """ test_name = "/chromiumsync/time" if not self._ShouldHandleRequest(test_name): return False # Chrome hates it if we send a response before reading the request. if self.headers.getheader('content-length'): length = int(self.headers.getheader('content-length')) _raw_request = self.rfile.read(length) self.send_response(200) self.send_header('Content-Type', 'text/plain') self.end_headers() self.wfile.write('0123456789') return True def ChromiumSyncCommandHandler(self): """Handle a chromiumsync command arriving via http. This covers all sync protocol commands: authentication, getupdates, and commit. """ test_name = "/chromiumsync/command" if not self._ShouldHandleRequest(test_name): return False length = int(self.headers.getheader('content-length')) raw_request = self.rfile.read(length) http_response = 200 raw_reply = None if not self.server.GetAuthenticated(): http_response = 401 challenge = 'GoogleLogin realm="http://%s", service="chromiumsync"' % ( self.server.server_address[0]) else: http_response, raw_reply = self.server.HandleCommand( self.path, raw_request) ### Now send the response to the client. ### self.send_response(http_response) if http_response == 401: self.send_header('www-Authenticate', challenge) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncMigrationOpHandler(self): test_name = "/chromiumsync/migrate" if not self._ShouldHandleRequest(test_name): return False http_response, raw_reply = self.server._sync_handler.HandleMigrate( self.path) self.send_response(http_response) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncCredHandler(self): test_name = "/chromiumsync/cred" if not self._ShouldHandleRequest(test_name): return False try: query = urlparse.urlparse(self.path)[4] cred_valid = urlparse.parse_qs(query)['valid'] if cred_valid[0] == 'True': self.server.SetAuthenticated(True) else: self.server.SetAuthenticated(False) except Exception: self.server.SetAuthenticated(False) http_response = 200 raw_reply = 'Authenticated: %s ' % self.server.GetAuthenticated() self.send_response(http_response) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncXmppCredHandler(self): test_name = "/chromiumsync/xmppcred" if not self._ShouldHandleRequest(test_name): return False xmpp_server = self.server.GetXmppServer() try: query = urlparse.urlparse(self.path)[4] cred_valid = urlparse.parse_qs(query)['valid'] if cred_valid[0] == 'True': xmpp_server.SetAuthenticated(True) else: xmpp_server.SetAuthenticated(False) except: xmpp_server.SetAuthenticated(False) http_response = 200 raw_reply = 'XMPP Authenticated: %s ' % xmpp_server.GetAuthenticated() self.send_response(http_response) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncDisableNotificationsOpHandler(self): test_name = "/chromiumsync/disablenotifications" if not self._ShouldHandleRequest(test_name): return False self.server.GetXmppServer().DisableNotifications() result = 200 raw_reply = ('<html><title>Notifications disabled</title>' '<H1>Notifications disabled</H1></html>') self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncEnableNotificationsOpHandler(self): test_name = "/chromiumsync/enablenotifications" if not self._ShouldHandleRequest(test_name): return False self.server.GetXmppServer().EnableNotifications() result = 200 raw_reply = ('<html><title>Notifications enabled</title>' '<H1>Notifications enabled</H1></html>') self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncSendNotificationOpHandler(self): test_name = "/chromiumsync/sendnotification" if not self._ShouldHandleRequest(test_name): return False query = urlparse.urlparse(self.path)[4] query_params = urlparse.parse_qs(query) channel = '' data = '' if 'channel' in query_params: channel = query_params['channel'][0] if 'data' in query_params: data = query_params['data'][0] self.server.GetXmppServer().SendNotification(channel, data) result = 200 raw_reply = ('<html><title>Notification sent</title>' '<H1>Notification sent with channel "%s" ' 'and data "%s"</H1></html>' % (channel, data)) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncBirthdayErrorOpHandler(self): test_name = "/chromiumsync/birthdayerror" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = self.server._sync_handler.HandleCreateBirthdayError() self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncTransientErrorOpHandler(self): test_name = "/chromiumsync/transienterror" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = self.server._sync_handler.HandleSetTransientError() self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncErrorOpHandler(self): test_name = "/chromiumsync/error" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = self.server._sync_handler.HandleSetInducedError( self.path) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncSyncTabFaviconsOpHandler(self): test_name = "/chromiumsync/synctabfavicons" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = self.server._sync_handler.HandleSetSyncTabFavicons() self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncCreateSyncedBookmarksOpHandler(self): test_name = "/chromiumsync/createsyncedbookmarks" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = self.server._sync_handler.HandleCreateSyncedBookmarks() self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncEnableKeystoreEncryptionOpHandler(self): test_name = "/chromiumsync/enablekeystoreencryption" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = ( self.server._sync_handler.HandleEnableKeystoreEncryption()) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncRotateKeystoreKeysOpHandler(self): test_name = "/chromiumsync/rotatekeystorekeys" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = ( self.server._sync_handler.HandleRotateKeystoreKeys()) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncEnableManagedUserAcknowledgementHandler(self): test_name = "/chromiumsync/enablemanageduseracknowledgement" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = ( self.server._sync_handler.HandleEnableManagedUserAcknowledgement()) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def ChromiumSyncEnablePreCommitGetUpdateAvoidanceHandler(self): test_name = "/chromiumsync/enableprecommitgetupdateavoidance" if not self._ShouldHandleRequest(test_name): return False result, raw_reply = ( self.server._sync_handler.HandleEnablePreCommitGetUpdateAvoidance()) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def GaiaOAuth2TokenHandler(self): test_name = "/o/oauth2/token" if not self._ShouldHandleRequest(test_name): return False if self.headers.getheader('content-length'): length = int(self.headers.getheader('content-length')) _raw_request = self.rfile.read(length) result, raw_reply = ( self.server._sync_handler.HandleGetOauth2Token()) self.send_response(result) self.send_header('Content-Type', 'application/json') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def GaiaSetOAuth2TokenResponseHandler(self): test_name = "/setfakeoauth2token" if not self._ShouldHandleRequest(test_name): return False # The index of 'query' is 4. # See http://docs.python.org/2/library/urlparse.html query = urlparse.urlparse(self.path)[4] query_params = urlparse.parse_qs(query) response_code = 0 request_token = '' access_token = '' expires_in = 0 token_type = '' if 'response_code' in query_params: response_code = query_params['response_code'][0] if 'request_token' in query_params: request_token = query_params['request_token'][0] if 'access_token' in query_params: access_token = query_params['access_token'][0] if 'expires_in' in query_params: expires_in = query_params['expires_in'][0] if 'token_type' in query_params: token_type = query_params['token_type'][0] result, raw_reply = ( self.server._sync_handler.HandleSetOauth2Token( response_code, request_token, access_token, expires_in, token_type)) self.send_response(result) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(raw_reply)) self.end_headers() self.wfile.write(raw_reply) return True def TriggerSyncedNotificationHandler(self): test_name = "/triggersyncednotification" if not self._ShouldHandleRequest(test_name): return False query = urlparse.urlparse(self.path)[4] query_params = urlparse.parse_qs(query) serialized_notification = '' if 'serialized_notification' in query_params: serialized_notification = query_params['serialized_notification'][0] try: notification_string = self.server._sync_handler.account \ .AddSyncedNotification(serialized_notification) reply = "A synced notification was triggered:\n\n" reply += "<code>{}</code>.".format(notification_string) response_code = 200 except chromiumsync.ClientNotConnectedError: reply = ('The client is not connected to the server, so the notification' ' could not be created.') response_code = 400 self.send_response(response_code) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(reply)) self.end_headers() self.wfile.write(reply) return True def CustomizeClientCommandHandler(self): test_name = "/customizeclientcommand" if not self._ShouldHandleRequest(test_name): return False query = urlparse.urlparse(self.path)[4] query_params = urlparse.parse_qs(query) if 'sessions_commit_delay_seconds' in query_params: sessions_commit_delay = query_params['sessions_commit_delay_seconds'][0] try: command_string = self.server._sync_handler.CustomizeClientCommand( int(sessions_commit_delay)) response_code = 200 reply = "The ClientCommand was customized:\n\n" reply += "<code>{}</code>.".format(command_string) except ValueError: response_code = 400 reply = "sessions_commit_delay_seconds was not an int" else: response_code = 400 reply = "sessions_commit_delay_seconds is required" self.send_response(response_code) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(reply)) self.end_headers() self.wfile.write(reply) return True def SyncedNotificationsPageHandler(self): test_name = "/syncednotifications" if not self._ShouldHandleRequest(test_name): return False html = open('sync/tools/testserver/synced_notifications.html', 'r').read() self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(html)) self.end_headers() self.wfile.write(html) return True class SyncServerRunner(testserver_base.TestServerRunner): """TestServerRunner for the net test servers.""" def __init__(self): super(SyncServerRunner, self).__init__() def create_server(self, server_data): port = self.options.port host = self.options.host xmpp_port = self.options.xmpp_port server = SyncHTTPServer((host, port), xmpp_port, SyncPageHandler) print ('Sync HTTP server started at %s:%d/chromiumsync...' % (host, server.server_port)) print ('Fake OAuth2 Token server started at %s:%d/o/oauth2/token...' % (host, server.server_port)) print ('Sync XMPP server started at %s:%d...' % (host, server.xmpp_port)) server_data['port'] = server.server_port server_data['xmpp_port'] = server.xmpp_port return server def run_server(self): testserver_base.TestServerRunner.run_server(self) def add_options(self): testserver_base.TestServerRunner.add_options(self) self.option_parser.add_option('--xmpp-port', default='0', type='int', help='Port used by the XMPP server. If ' 'unspecified, the XMPP server will listen on ' 'an ephemeral port.') # Override the default logfile name used in testserver.py. self.option_parser.set_defaults(log_file='sync_testserver.log') if __name__ == '__main__': sys.exit(SyncServerRunner().main())