普通文本  |  226行  |  5.85 KB

"""A simple script to backfill tko_task_references table with throttling."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import collections
import contextlib
import logging
import time

import MySQLdb


class BackfillException(Exception):
  pass


def _parse_args():
  parser = argparse.ArgumentParser(
      description=__doc__)
  parser.add_argument('--host', required=True, help='mysql server host')
  parser.add_argument('--user', required=True, help='mysql server user')
  parser.add_argument('--password', required=True, help='mysql server password')
  parser.add_argument('--dryrun', action='store_true', default=False)
  parser.add_argument(
      '--num-iterations',
      default=None,
      type=int,
      help='If set, total number of iterations. Default is no limit.',
  )
  parser.add_argument(
      '--batch-size',
      default=1000,
      help='Number of tko_jobs rows to read in one iteration',
  )
  parser.add_argument(
      '--sleep-seconds',
      type=int,
      default=1,
      help='Time to sleep between iterations',
  )

  args = parser.parse_args()
  if args.dryrun:
    if not args.num_iterations:
      logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
      args.num_iterations = 5
  return args



@contextlib.contextmanager
def _mysql_connection(args):
  conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
  with _mysql_cursor(conn) as c:
    c.execute('USE chromeos_autotest_db;')
  try:
    yield conn
  finally:
    conn.close()


@contextlib.contextmanager
def _autocommit(conn):
  try:
    yield conn
  except:
    conn.rollback()
  else:
    conn.commit()


@contextlib.contextmanager
def _mysql_cursor(conn):
  c = conn.cursor()
  try:
    yield c
  finally:
    c.close()


def _latest_unfilled_job_idx(conn):
  with _mysql_cursor(conn) as c:
    c.execute("""
SELECT tko_job_idx
FROM tko_task_references
ORDER BY tko_job_idx
LIMIT 1
;""")
    r = c.fetchall()
    if r:
      return str(long(r[0][0]) - 1)
  logging.debug('tko_task_references is empty.'
               ' Grabbing the latest tko_job_idx to fill.')
  with _mysql_cursor(conn) as c:
    c.execute("""
SELECT job_idx
FROM tko_jobs
ORDER BY job_idx DESC
LIMIT 1
;""")
    r = c.fetchall()
    if r:
      return r[0][0]
  return None


_TKOTaskReference = collections.namedtuple(
    '_TKOTaskReference',
    ['tko_job_idx', 'task_reference', 'parent_task_reference'],
)

_SQL_SELECT_TASK_REFERENCES = """
SELECT job_idx, afe_job_id, afe_parent_job_id
FROM tko_jobs
WHERE job_idx <= %(latest_job_idx)s
ORDER BY job_idx DESC
LIMIT %(batch_size)s
;"""
_SQL_INSERT_TASK_REFERENCES = """
INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
VALUES %(values)s
;"""
_SQL_SELECT_TASK_REFERENCE = """
SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
;"""


def _compute_task_references(conn, latest_job_idx, batch_size):
  with _mysql_cursor(conn) as c:
    sql = _SQL_SELECT_TASK_REFERENCES % {
        'latest_job_idx': latest_job_idx,
        'batch_size': batch_size,
    }
    c.execute(sql)
    rs = c.fetchall()
    if rs is None:
      return []

    return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]


def _insert_task_references(conn, task_references, dryrun):
  values = ', '.join([
      '("afe", %s, "%s", "%s")' %
      (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
      for tr in task_references
  ])
  sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
  if dryrun:
    if len(sql) < 200:
      sql_log = sql
    else:
      sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
    logging.debug('Would have run: %s', sql_log)
  with _autocommit(conn) as conn:
    with _mysql_cursor(conn) as c:
      c.execute(sql)


def _verify_task_references(conn, task_references):
  # Just verify that the last one was inserted.
  if not task_references:
    return
  tko_job_idx = task_references[-1].tko_job_idx
  sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
  with _mysql_cursor(conn) as c:
    c.execute(sql)
    r = c.fetchall()
    if not r or r[0][0] != tko_job_idx:
      raise BackfillException(
          'Failed to insert task reference for tko_job_id %s' % tko_job_idx)


def _next_job_idx(task_references):
  return str(long(task_references[-1].tko_job_idx) - 1)

def main():
  logging.basicConfig(level=logging.DEBUG)
  args = _parse_args()
  with _mysql_connection(args) as conn:
    tko_job_idx = _latest_unfilled_job_idx(conn)
    if tko_job_idx is None:
      raise BackfillException('Failed to get last unfilled tko_job_idx')
    logging.info('First tko_job_idx to fill: %s', tko_job_idx)

  while True:
    logging.info('####################################')
    logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)

    task_references = ()
    with _mysql_connection(args) as conn:
      task_references = _compute_task_references(
          conn, tko_job_idx, args.batch_size)
    if not task_references:
      logging.info('No more unfilled task references. All done!')
      break

    logging.info(
        'Inserting %d task references. tko_job_ids: %d...%d',
        len(task_references),
        task_references[0].tko_job_idx,
        task_references[-1].tko_job_idx,
    )
    with _mysql_connection(args) as conn:
      _insert_task_references(conn, task_references, args.dryrun)
    if not args.dryrun:
      with _mysql_connection(args) as conn:
        _verify_task_references(conn, task_references)

    tko_job_idx = _next_job_idx(task_references)

    if args.num_iterations is not None:
      args.num_iterations -= 1
      if args.num_iterations <= 0:
        break
      logging.info('%d more iterations left', args.num_iterations)
    logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
    time.sleep(args.sleep_seconds)


if __name__ == '__main__':
  main()