普通文本  |  325行  |  10.14 KB

# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import argparse
import json
import os
import sys
import urlparse

from hooks import install

from paste import fileapp
from paste import httpserver

import webapp2
from webapp2 import Route, RedirectHandler

from dashboard_build import dashboard_dev_server_config
from perf_insights_build import perf_insights_dev_server_config
from tracing_build import tracing_dev_server_config

_MAIN_HTML = """<html><body>
<h1>Run Unit Tests</h1>
<ul>
%s
</ul>
<h1>Quick links</h1>
<ul>
%s
</ul>
</body></html>
"""

_QUICK_LINKS = [
    ('Trace File Viewer',
     '/tracing_examples/trace_viewer.html'),
    ('Perf Insights Viewer',
     '/perf_insights_examples/perf_insights_viewer.html')
]

_LINK_ITEM = '<li><a href="%s">%s</a></li>'

def _GetFilesIn(basedir):
  data_files = []
  for dirpath, dirnames, filenames in os.walk(basedir, followlinks=True):
    new_dirnames = [d for d in dirnames if not d.startswith('.')]
    del dirnames[:]
    dirnames += new_dirnames

    for f in filenames:
      if f.startswith('.'):
        continue
      if f == 'README.md':
        continue
      full_f = os.path.join(dirpath, f)
      rel_f = os.path.relpath(full_f, basedir)
      data_files.append(rel_f)

  data_files.sort()
  return data_files


def _RelPathToUnixPath(p):
  return p.replace(os.sep, '/')


class TestResultHandler(webapp2.RequestHandler):
  def post(self, *args, **kwargs):  # pylint: disable=unused-argument
    msg = self.request.body
    ostream = sys.stdout if 'PASSED' in msg else sys.stderr
    ostream.write(msg + '\n')
    return self.response.write('')


class TestsCompletedHandler(webapp2.RequestHandler):
  def post(self, *args, **kwargs):  # pylint: disable=unused-argument
    msg = self.request.body
    sys.stdout.write(msg + '\n')
    exit_code = 0 if 'ALL_PASSED' in msg else 1
    if hasattr(self.app.server, 'please_exit'):
      self.app.server.please_exit(exit_code)
    return self.response.write('')


class DirectoryListingHandler(webapp2.RequestHandler):
  def get(self, *args, **kwargs):  # pylint: disable=unused-argument
    source_path = kwargs.pop('_source_path', None)
    mapped_path = kwargs.pop('_mapped_path', None)
    assert mapped_path.endswith('/')

    data_files_relative_to_top = _GetFilesIn(source_path)
    data_files = [mapped_path + x
                  for x in data_files_relative_to_top]

    files_as_json = json.dumps(data_files)
    self.response.content_type = 'application/json'
    return self.response.write(files_as_json)


class FileAppWithGZipHandling(fileapp.FileApp):
  def guess_type(self):
    content_type, content_encoding = \
        super(FileAppWithGZipHandling, self).guess_type()
    if not self.filename.endswith('.gz'):
      return content_type, content_encoding
    # By default, FileApp serves gzip files as their underlying type with
    # Content-Encoding of gzip. That causes them to show up on the client
    # decompressed. That ends up being surprising to our xhr.html system.
    return None, None

class SourcePathsHandler(webapp2.RequestHandler):
  def get(self, *args, **kwargs):  # pylint: disable=unused-argument
    source_paths = kwargs.pop('_source_paths', [])

    path = self.request.path
    # This is how we do it. Its... strange, but its what we've done since
    # the dawn of time. Aka 4 years ago, lol.
    for mapped_path in source_paths:
      rel = os.path.relpath(path, '/')
      candidate = os.path.join(mapped_path, rel)
      if os.path.exists(candidate):
        app = FileAppWithGZipHandling(candidate)
        app.cache_control(no_cache=True)
        return app
    self.abort(404)

  @staticmethod
  def GetServingPathForAbsFilename(source_paths, filename):
    if not os.path.isabs(filename):
      raise Exception('filename must be an absolute path')

    for mapped_path in source_paths:
      if not filename.startswith(mapped_path):
        continue
      rel = os.path.relpath(filename, mapped_path)
      unix_rel = _RelPathToUnixPath(rel)
      return unix_rel
    return None


class SimpleDirectoryHandler(webapp2.RequestHandler):
  def get(self, *args, **kwargs):  # pylint: disable=unused-argument
    top_path = os.path.abspath(kwargs.pop('_top_path', None))
    if not top_path.endswith(os.path.sep):
      top_path += os.path.sep

    joined_path = os.path.abspath(
        os.path.join(top_path, kwargs.pop('rest_of_path')))
    if not joined_path.startswith(top_path):
      self.response.set_status(403)
      return
    app = FileAppWithGZipHandling(joined_path)
    app.cache_control(no_cache=True)
    return app


class TestOverviewHandler(webapp2.RequestHandler):
  def get(self, *args, **kwargs):  # pylint: disable=unused-argument
    test_links = []
    for name, path in kwargs.pop('pds').iteritems():
      test_links.append(_LINK_ITEM % (path, name))
    quick_links = []
    for name, path in _QUICK_LINKS:
      quick_links.append(_LINK_ITEM % (path, name))
    self.response.out.write(_MAIN_HTML % ('\n'.join(test_links),
                                          '\n'.join(quick_links)))

class DevServerApp(webapp2.WSGIApplication):
  def __init__(self, pds, args):
    super(DevServerApp, self).__init__(debug=True)
    self.pds = pds
    self._server = None
    self._all_source_paths = []
    self._all_mapped_test_data_paths = []
    self._InitFromArgs(args)

  @property
  def server(self):
    return self._server

  @server.setter
  def server(self, server):
    self._server = server

  def _InitFromArgs(self, args):
    default_tests = dict((pd.GetName(), pd.GetRunUnitTestsUrl())
                         for pd in self.pds)
    routes = [
        Route('/tests.html', TestOverviewHandler,
              defaults={'pds': default_tests}),
        Route('', RedirectHandler, defaults={'_uri': '/tests.html'}),
        Route('/', RedirectHandler, defaults={'_uri': '/tests.html'}),
    ]
    for pd in self.pds:
      routes += pd.GetRoutes(args)
      routes += [
          Route('/%s/notify_test_result' % pd.GetName(),
                TestResultHandler),
          Route('/%s/notify_tests_completed' % pd.GetName(),
                TestsCompletedHandler)
      ]

    for pd in self.pds:
      # Test data system.
      for mapped_path, source_path in pd.GetTestDataPaths(args):
        self._all_mapped_test_data_paths.append((mapped_path, source_path))
        routes.append(Route('%s__file_list__' % mapped_path,
                            DirectoryListingHandler,
                            defaults={
                                '_source_path': source_path,
                                '_mapped_path': mapped_path
                            }))
        routes.append(Route('%s<rest_of_path:.+>' % mapped_path,
                            SimpleDirectoryHandler,
                            defaults={'_top_path': source_path}))

    # This must go last, because its catch-all.
    #
    # Its funky that we have to add in the root path. The long term fix is to
    # stop with the crazy multi-source-pathing thing.
    for pd in self.pds:
      self._all_source_paths += pd.GetSourcePaths(args)
    routes.append(
        Route('/<:.+>', SourcePathsHandler,
              defaults={'_source_paths': self._all_source_paths}))

    for route in routes:
      self.router.add(route)

  def GetAbsFilenameForHref(self, href):
    for source_path in self._all_source_paths:
      full_source_path = os.path.abspath(source_path)
      expanded_href_path = os.path.abspath(os.path.join(full_source_path,
                                                        href.lstrip('/')))
      if (os.path.exists(expanded_href_path) and
          os.path.commonprefix([full_source_path,
                                expanded_href_path]) == full_source_path):
        return expanded_href_path
    return None

  def GetURLForAbsFilename(self, filename):
    assert self.server is not None
    for mapped_path, source_path in self._all_mapped_test_data_paths:
      if not filename.startswith(source_path):
        continue
      rel = os.path.relpath(filename, source_path)
      unix_rel = _RelPathToUnixPath(rel)
      url = urlparse.urljoin(mapped_path, unix_rel)
      return url

    path = SourcePathsHandler.GetServingPathForAbsFilename(
        self._all_source_paths, filename)
    if path is None:
      return None
    return urlparse.urljoin('/', path)


def _AddPleaseExitMixinToServer(server):
  # Shutting down httpserver gracefully and yielding a return code requires
  # a bit of mixin code.

  exit_code_attempt = []
  def PleaseExit(exit_code):
    if len(exit_code_attempt) > 0:
      return
    exit_code_attempt.append(exit_code)
    server.running = False

  real_serve_forever = server.serve_forever

  def ServeForever():
    try:
      real_serve_forever()
    except KeyboardInterrupt:
      # allow CTRL+C to shutdown
      return 255

    if len(exit_code_attempt) == 1:
      return exit_code_attempt[0]
    # The serve_forever returned for some reason separate from
    # exit_please.
    return 0

  server.please_exit = PleaseExit
  server.serve_forever = ServeForever


def _AddCommandLineArguments(pds, argv):
  parser = argparse.ArgumentParser(description='Run development server')
  parser.add_argument(
      '--no-install-hooks', dest='install_hooks', action='store_false')
  parser.add_argument('-p', '--port', default=8003, type=int)
  for pd in pds:
    g = parser.add_argument_group(pd.GetName())
    pd.AddOptionstToArgParseGroup(g)
  args = parser.parse_args(args=argv[1:])
  return args


def Main(argv):
  pds = [
      dashboard_dev_server_config.DashboardDevServerConfig(),
      perf_insights_dev_server_config.PerfInsightsDevServerConfig(),
      tracing_dev_server_config.TracingDevServerConfig(),
  ]

  args = _AddCommandLineArguments(pds, argv)

  if args.install_hooks:
    install.InstallHooks()

  app = DevServerApp(pds, args=args)

  server = httpserver.serve(app, host='127.0.0.1', port=args.port,
                            start_loop=False)
  _AddPleaseExitMixinToServer(server)
  # pylint: disable=no-member
  server.urlbase = 'http://127.0.0.1:%i' % server.server_port
  app.server = server

  sys.stderr.write('Now running on %s\n' % server.urlbase)

  return server.serve_forever()