普通文本  |  296行  |  9.26 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The hill genetic algorithm.

Part of the Chrome build flags optimization.
"""

__author__ = 'yuhenglong@google.com (Yuheng Long)'

import random

import flags
from flags import Flag
from flags import FlagSet
from generation import Generation
from task import Task


def CrossoverWith(first_flag, second_flag):
  """Get a crossed over gene.

  At present, this just picks either/or of these values.  However, it could be
  implemented as an integer maskover effort, if required.

  Args:
    first_flag: The first gene (Flag) to cross over with.
    second_flag: The second gene (Flag) to cross over with.

  Returns:
    A Flag that can be considered appropriately randomly blended between the
    first and second input flag.
  """

  return first_flag if random.randint(0, 1) else second_flag


def RandomMutate(specs, flag_set, mutation_rate):
  """Randomly mutate the content of a task.

  Args:
    specs: A list of spec from which the flag set is created.
    flag_set: The current flag set being mutated
    mutation_rate: What fraction of genes to mutate.

  Returns:
    A Genetic Task constructed by randomly mutating the input flag set.
  """

  results_flags = []

  for spec in specs:
    # Randomly choose whether this flag should be mutated.
    if random.randint(0, int(1 / mutation_rate)):
      continue

    # If the flag is not already in the flag set, it is added.
    if spec not in flag_set:
      results_flags.append(Flag(spec))
      continue

    # If the flag is already in the flag set, it is mutated.
    numeric_flag_match = flags.Search(spec)

    # The value of a numeric flag will be changed, and a boolean flag will be
    # dropped.
    if not numeric_flag_match:
      continue

    value = flag_set[spec].GetValue()

    # Randomly select a nearby value of the current value of the flag.
    rand_arr = [value]
    if value + 1 < int(numeric_flag_match.group('end')):
      rand_arr.append(value + 1)

    rand_arr.append(value - 1)
    value = random.sample(rand_arr, 1)[0]

    # If the value is smaller than the start of the spec, this flag will be
    # dropped.
    if value != int(numeric_flag_match.group('start')) - 1:
      results_flags.append(Flag(spec, value))

  return GATask(FlagSet(results_flags))


class GATask(Task):

  def __init__(self, flag_set):
    Task.__init__(self, flag_set)

  def ReproduceWith(self, other, specs, mutation_rate):
    """Reproduce with other FlagSet.

    Args:
      other: A FlagSet to reproduce with.
      specs: A list of spec from which the flag set is created.
      mutation_rate: one in mutation_rate flags will be mutated (replaced by a
        random version of the same flag, instead of one from either of the
        parents).  Set to 0 to disable mutation.

    Returns:
      A GA task made by mixing self with other.
    """

    # Get the flag dictionary.
    father_flags = self.GetFlags().GetFlags()
    mother_flags = other.GetFlags().GetFlags()

    # Flags that are common in both parents and flags that belong to only one
    # parent.
    self_flags = []
    other_flags = []
    common_flags = []

    # Find out flags that are common to both parent and flags that belong soly
    # to one parent.
    for self_flag in father_flags:
      if self_flag in mother_flags:
        common_flags.append(self_flag)
      else:
        self_flags.append(self_flag)

    for other_flag in mother_flags:
      if other_flag not in father_flags:
        other_flags.append(other_flag)

    # Randomly select flags that belong to only one parent.
    output_flags = [father_flags[f] for f in self_flags if random.randint(0, 1)]
    others = [mother_flags[f] for f in other_flags if random.randint(0, 1)]
    output_flags.extend(others)
    # Turn on flags that belong to both parent. Randomly choose the value of the
    # flag from either parent.
    for flag in common_flags:
      output_flags.append(CrossoverWith(father_flags[flag], mother_flags[flag]))

    # Mutate flags
    if mutation_rate:
      return RandomMutate(specs, FlagSet(output_flags), mutation_rate)

    return GATask(FlagSet(output_flags))


class GAGeneration(Generation):
  """The Genetic Algorithm."""

  # The value checks whether the algorithm has converged and arrives at a fixed
  # point. If STOP_THRESHOLD of generations have not seen any performance
  # improvement, the Genetic Algorithm stops.
  STOP_THRESHOLD = None

  # Number of tasks in each generation.
  NUM_CHROMOSOMES = None

  # The value checks whether the algorithm has converged and arrives at a fixed
  # point. If NUM_TRIALS of trials have been attempted to generate a new task
  # without a success, the Genetic Algorithm stops.
  NUM_TRIALS = None

  # The flags that can be used to generate new tasks.
  SPECS = None

  # What fraction of genes to mutate.
  MUTATION_RATE = 0

  @staticmethod
  def InitMetaData(stop_threshold, num_chromosomes, num_trials, specs,
                   mutation_rate):
    """Set up the meta data for the Genetic Algorithm.

    Args:
      stop_threshold: The number of generations, upon which no performance has
        seen, the Genetic Algorithm stops.
      num_chromosomes: Number of tasks in each generation.
      num_trials: The number of trials, upon which new task has been tried to
        generated without success, the Genetic Algorithm stops.
      specs: The flags that can be used to generate new tasks.
      mutation_rate: What fraction of genes to mutate.
    """

    GAGeneration.STOP_THRESHOLD = stop_threshold
    GAGeneration.NUM_CHROMOSOMES = num_chromosomes
    GAGeneration.NUM_TRIALS = num_trials
    GAGeneration.SPECS = specs
    GAGeneration.MUTATION_RATE = mutation_rate

  def __init__(self, tasks, parents, total_stucks):
    """Set up the meta data for the Genetic Algorithm.

    Args:
      tasks: A set of tasks to be run.
      parents: A set of tasks from which this new generation is produced. This
        set also contains the best tasks generated so far.
      total_stucks: The number of generations that have not seen improvement.
        The Genetic Algorithm will stop once the total_stucks equals to
        NUM_TRIALS defined in the GAGeneration class.
    """

    Generation.__init__(self, tasks, parents)
    self._total_stucks = total_stucks

  def IsImproved(self):
    """True if this generation has improvement upon its parent generation."""

    tasks = self.Pool()
    parents = self.CandidatePool()

    # The first generate does not have parents.
    if not parents:
      return True

    # Found out whether a task has improvement upon the best task in the
    # parent generation.
    best_parent = sorted(parents, key=lambda task: task.GetTestResult())[0]
    best_current = sorted(tasks, key=lambda task: task.GetTestResult())[0]

    # At least one task has improvement.
    if best_current.IsImproved(best_parent):
      self._total_stucks = 0
      return True

    # If STOP_THRESHOLD of generations have no improvement, the algorithm stops.
    if self._total_stucks >= GAGeneration.STOP_THRESHOLD:
      return False

    self._total_stucks += 1
    return True

  def Next(self, cache):
    """Calculate the next generation.

    Generate a new generation from the a set of tasks. This set contains the
      best set seen so far and the tasks executed in the parent generation.

    Args:
      cache: A set of tasks that have been generated before.

    Returns:
      A set of new generations.
    """

    target_len = GAGeneration.NUM_CHROMOSOMES
    specs = GAGeneration.SPECS
    mutation_rate = GAGeneration.MUTATION_RATE

    # Collect a set of size target_len of tasks. This set will be used to
    # produce a new generation of tasks.
    gen_tasks = [task for task in self.Pool()]

    parents = self.CandidatePool()
    if parents:
      gen_tasks.extend(parents)

    # A set of tasks that are the best. This set will be used as the parent
    # generation to produce the next generation.
    sort_func = lambda task: task.GetTestResult()
    retained_tasks = sorted(gen_tasks, key=sort_func)[:target_len]

    child_pool = set()
    for father in retained_tasks:
      num_trials = 0
      # Try num_trials times to produce a new child.
      while num_trials < GAGeneration.NUM_TRIALS:
        # Randomly select another parent.
        mother = random.choice(retained_tasks)
        # Cross over.
        child = mother.ReproduceWith(father, specs, mutation_rate)
        if child not in child_pool and child not in cache:
          child_pool.add(child)
          break
        else:
          num_trials += 1

    num_trials = 0

    while len(child_pool) < target_len and num_trials < GAGeneration.NUM_TRIALS:
      for keep_task in retained_tasks:
        # Mutation.
        child = RandomMutate(specs, keep_task.GetFlags(), mutation_rate)
        if child not in child_pool and child not in cache:
          child_pool.add(child)
          if len(child_pool) >= target_len:
            break
        else:
          num_trials += 1

    # If NUM_TRIALS of tries have been attempted without generating a set of new
    # tasks, the algorithm stops.
    if num_trials >= GAGeneration.NUM_TRIALS:
      return []

    assert len(child_pool) == target_len

    return [GAGeneration(child_pool, set(retained_tasks), self._total_stucks)]