普通文本  |  279行  |  9.91 KB

#!/usr/bin/python
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module to upload a MySQL dump file to Cloud SQL.

Usage:
  dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE
                      [REMOTE]

  Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional
  arguments will connect to localhost as root with an empty password.

  positional arguments:
    FILE             text dump file containing MySQL commands
    REMOTE           Cloud SQL instance name or MySQL hostname

  optional arguments:
    -h, --help       show this help message and exit
    --resume NUM     resume dump at command NUM
    --user USER      user (ignored for CloudSQL)
    --passwd PASSWD  passwd (ignored for CloudSQL)
"""

from __future__ import division
import argparse
import collections
import datetime
import os
import re
import sys
import time


BYTES_PER_GB = 2**30


class MySQLConnectionManager(object):
    """Manages connections to a MySQL database.

    Vars:
      factory: A *ConnectionFactory.
      connected: Whether we currently hold a live DB connection.
      cmd_num: The number of commands executed.
    """
    def __init__(self, connection_factory):
        self.factory = connection_factory
        self.connected = False
        self.cmd_num = 0

    def write(self, data, execute_cmd=True, increment_cmd=False):
        """Buffers writes to command boundaries.

        Args:
          data: A line of data from the MySQL dump.
          execute_cmd: Whether to execute the command, defaults to True.
          increment_cmd: Whether to increment cmd_num, defaults to False.
          """
        if not data or not data.strip() or data == '\n' or data[:2] == '--':
            return
        self._cmd += data[:-1] if data[-1] == '\n' else data
        if self._cmd[-1] != ';':
            return
        # Execute command.
        if execute_cmd:
            self._cursor.execute(self._cmd.decode('utf-8'))
        self._cmd = ''
        if increment_cmd:
            self.cmd_num += 1

    def disconnect(self):
      """Closes the current database connection."""
      if self.connected:
          self.connected = False
          self._cursor.close()
          self._db.close()

    def connect(self):
      """Creates a new database connection."""
      self.disconnect()
      self._db = self.factory.connect()
      self.connected = True
      self._cursor = self._db.cursor()
      self._cmd = ''


class CloudSQLConnectionFactory(object):
    """Creates Cloud SQL database connections."""
    def __init__(self, cloudsql_instance):
        self._instance = cloudsql_instance

    def connect(self):
        """Connects to the Cloud SQL database and returns the connection.

        Returns:
          A MySQLdb compatible database connection to the Cloud SQL instance.
        """
        print 'Connecting to Cloud SQL instance %s.' % self._instance
        try:
            from google.storage.speckle.python.api import rdbms_googleapi
        except ImportError:
            sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK '
                     'directory to your PYTHONPATH. Download the SDK from: '
                     'https://developers.google.com/appengine/downloads')
        return rdbms_googleapi.connect(None, instance=self._instance)


class LocalSQLConnectionFactory(object):
    """Creates local MySQL database connections."""
    def __init__(self, host=None, user='root', passwd=''):
        if not host:
          host = 'localhost'
        self._host = host
        self._user = user
        self._passwd = passwd

    def connect(self):
        """Connects to the local MySQL database and returns the connection.

        Returns:
          A MySQLdb database connection to the local MySQL database.
        """
        print 'Connecting to mysql at localhost as %s.' % self._user
        try:
            import MySQLdb
        except ImportError:
            sys.exit('Unable to import MySQLdb. To install on Ubuntu: '
                     'apt-get install python-mysqldb')
        return MySQLdb.connect(host=self._host, user=self._user,
                               passwd=self._passwd)


class MySQLState(object):
    """Maintains the MySQL global state.

    This is a hack that keeps record of all MySQL lines that set global state.
    These are needed to reconstruct the MySQL state on resume.
    """
    _set_regex = re.compile('\S*\s*SET(.*)[\s=]')

    def __init__(self):
        self._db_line = ''
        self._table_lock = []
        self._sets = collections.OrderedDict()

    def process(self, line):
        """Check and save lines that affect the global state.

        Args:
          line: A line from the MySQL dump file.
        """
        # Most recent USE line.
        if line[:3] == 'USE':
            self._db_line = line
        # SET variables.
        m = self._set_regex.match(line)
        if m:
            self._sets[m.group(1).strip()] = line
        # Maintain LOCK TABLES
        if (line[:11] == 'LOCK TABLES' or
            ('ALTER TABLE' in line and 'DISABLE KEYS' in line)):
            self._table_lock.append(line)
        if (line[:14] == 'UNLOCK TABLES;'):
            self._table_lock = []

    def write(self, out):
        """Print lines to recreate the saved state.

        Args:
          out: A File-like object to write out saved state.
        """
        out.write(self._db_line)
        for v in self._sets.itervalues():
            out.write(v)
        for l in self._table_lock:
            out.write(l)

    def breakpoint(self, line):
      """Returns true if we can handle breaking after this line.

      Args:
        line: A line from the MySQL dump file.

      Returns:
        Boolean indicating whether we can break after |line|.
      """
      return (line[:28] == '-- Table structure for table' or
              line[:11] == 'INSERT INTO')


def dump_to_cloudsql(dumpfile, manager, cmd_offset=0):
    """Dumps a MySQL dump file to a database through a MySQLConnectionManager.

    Args:
      dumpfile: Path to a file from which to read the MySQL dump.
      manager: An instance of MySQLConnectionManager.
      cmd_offset: No commands will be executed on the database before this count
        is reached. Used to continue an uncompleted dump. Defaults to 0.
    """
    state = MySQLState()
    total = os.path.getsize(dumpfile)
    start_time = time.time()
    line_num = 0
    with open(dumpfile, 'r') as dump:
        for line in dump:
            line_num += 1
            if not manager.connected:
                manager.connect()
            try:
                # Construct commands from lines and execute them.
                state.process(line)
                if manager.cmd_num == cmd_offset and cmd_offset != 0:
                    print '\nRecreating state at line: %d' % line_num
                    state.write(manager)
                manager.write(line, manager.cmd_num >= cmd_offset, True)
                # Print status.
                sys.stdout.write(
                    '\rstatus:  %.3f%%     %0.2f GB     %d commands ' %
                    (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB,
                     manager.cmd_num))
                sys.stdout.flush()
            # Handle interrupts and connection failures.
            except KeyboardInterrupt:
                print ('\nInterrupted while executing command: %d' %
                       manager.cmd_num)
                raise
            except:
                print '\nFailed while executing command: %d' % manager.cmd_num
                delta = int(time.time() - start_time)
                print 'Total time: %s' % str(datetime.timedelta(seconds=delta))
                if state.breakpoint(line):
                    # Attempt to resume.
                    print ('Execution can resume from here (line = %d)' %
                           line_num)
                    manager.cmd_num += 1
                    cmd_offset = manager.cmd_num
                    print ('Will now attempt to auto-resume at command: %d' %
                           cmd_offset)
                    manager.disconnect()
                else:
                    print 'Execution may fail to resume correctly from here.'
                    print ('Use --resume=%d to attempt to resume the dump.' %
                           manager.cmd_num)
                    raise
    print '\nDone.'


if __name__ == '__main__':
    """Imports a MySQL database from a dump file.

    Interprets command line arguments and calls dump_to_cloudsql appropriately.
    """
    description = """Uploads MySQL dump file to a MySQL database or Cloud SQL.
                  With no optional arguments will connect to localhost as root
                  with an empty password."""
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('mysqldump', metavar='FILE',
                        help='text dump file containing MySQL commands')
    parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE',
        help='either a Cloud SQL account:instance or a hostname')
    parser.add_argument('--resume', default=0, type=int, metavar='NUM',
                        help='resume dump at command NUM')
    parser.add_argument('--user', default='root', metavar='USER',
                        help='user (ignored for Cloud SQL)')
    parser.add_argument('--passwd', default='', metavar='PASSWD',
                        help='passwd (ignored for Cloud SQL)')
    args = parser.parse_args()
    if args.remote and ':' in args.remote:
        connection = CloudSQLConnectionFactory(args.remote)
    else:
        connection = LocalSQLConnectionFactory(args.remote, args.user,
                                               args.passwd)
    if args.resume:
        print 'Resuming execution at command: %d' % options.resume
    dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection),
                     args.resume)