// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/threading/sequenced_worker_pool.h"

#include <list>
#include <map>
#include <set>
#include <utility>
#include <vector>

#include "base/atomic_sequence_num.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/critical_closure.h"
#include "base/debug/trace_event.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/linked_ptr.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/stl_util.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/tracked_objects.h"

#if defined(OS_MACOSX)
#include "base/mac/scoped_nsautorelease_pool.h"
#endif

#if !defined(OS_NACL)
#include "base/metrics/histogram.h"
#endif

namespace base {

namespace {

struct SequencedTask : public TrackingInfo  {
  SequencedTask()
      : sequence_token_id(0),
        trace_id(0),
        sequence_task_number(0),
        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}

  explicit SequencedTask(const tracked_objects::Location& from_here)
      : base::TrackingInfo(from_here, TimeTicks()),
        sequence_token_id(0),
        trace_id(0),
        sequence_task_number(0),
        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}

  ~SequencedTask() {}

  int sequence_token_id;
  int trace_id;
  int64 sequence_task_number;
  SequencedWorkerPool::WorkerShutdown shutdown_behavior;
  tracked_objects::Location posted_from;
  Closure task;

  // Non-delayed tasks and delayed tasks are managed together by time-to-run
  // order. We calculate the time by adding the posted time and the given delay.
  TimeTicks time_to_run;
};

struct SequencedTaskLessThan {
 public:
  bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
    if (lhs.time_to_run < rhs.time_to_run)
      return true;

    if (lhs.time_to_run > rhs.time_to_run)
      return false;

    // If the time happen to match, then we use the sequence number to decide.
    return lhs.sequence_task_number < rhs.sequence_task_number;
  }
};

// SequencedWorkerPoolTaskRunner ---------------------------------------------
// A TaskRunner which posts tasks to a SequencedWorkerPool with a
// fixed ShutdownBehavior.
//
// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
class SequencedWorkerPoolTaskRunner : public TaskRunner {
 public:
  SequencedWorkerPoolTaskRunner(
      const scoped_refptr<SequencedWorkerPool>& pool,
      SequencedWorkerPool::WorkerShutdown shutdown_behavior);

  // TaskRunner implementation
  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
                               const Closure& task,
                               TimeDelta delay) OVERRIDE;
  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;

 private:
  virtual ~SequencedWorkerPoolTaskRunner();

  const scoped_refptr<SequencedWorkerPool> pool_;

  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;

  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
};

SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
    const scoped_refptr<SequencedWorkerPool>& pool,
    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
    : pool_(pool),
      shutdown_behavior_(shutdown_behavior) {
}

SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
}

bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  if (delay == TimeDelta()) {
    return pool_->PostWorkerTaskWithShutdownBehavior(
        from_here, task, shutdown_behavior_);
  }
  return pool_->PostDelayedWorkerTask(from_here, task, delay);
}

bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
  return pool_->RunsTasksOnCurrentThread();
}

// SequencedWorkerPoolSequencedTaskRunner ------------------------------------
// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
// fixed sequence token.
//
// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
 public:
  SequencedWorkerPoolSequencedTaskRunner(
      const scoped_refptr<SequencedWorkerPool>& pool,
      SequencedWorkerPool::SequenceToken token,
      SequencedWorkerPool::WorkerShutdown shutdown_behavior);

  // TaskRunner implementation
  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
                               const Closure& task,
                               TimeDelta delay) OVERRIDE;
  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;

  // SequencedTaskRunner implementation
  virtual bool PostNonNestableDelayedTask(
      const tracked_objects::Location& from_here,
      const Closure& task,
      TimeDelta delay) OVERRIDE;

 private:
  virtual ~SequencedWorkerPoolSequencedTaskRunner();

  const scoped_refptr<SequencedWorkerPool> pool_;

  const SequencedWorkerPool::SequenceToken token_;

  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;

  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
};

SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
    const scoped_refptr<SequencedWorkerPool>& pool,
    SequencedWorkerPool::SequenceToken token,
    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
    : pool_(pool),
      token_(token),
      shutdown_behavior_(shutdown_behavior) {
}

SequencedWorkerPoolSequencedTaskRunner::
~SequencedWorkerPoolSequencedTaskRunner() {
}

bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  if (delay == TimeDelta()) {
    return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
        token_, from_here, task, shutdown_behavior_);
  }
  return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
}

bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
  return pool_->IsRunningSequenceOnCurrentThread(token_);
}

bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  // There's no way to run nested tasks, so simply forward to
  // PostDelayedTask.
  return PostDelayedTask(from_here, task, delay);
}

// Create a process-wide unique ID to represent this task in trace events. This
// will be mangled with a Process ID hash to reduce the likelyhood of colliding
// with MessageLoop pointers on other processes.
uint64 GetTaskTraceID(const SequencedTask& task,
                      void* pool) {
  return (static_cast<uint64>(task.trace_id) << 32) |
         static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
}

base::LazyInstance<base::ThreadLocalPointer<
    SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
        LAZY_INSTANCE_INITIALIZER;

}  // namespace

// Worker ---------------------------------------------------------------------

class SequencedWorkerPool::Worker : public SimpleThread {
 public:
  // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
  // around as long as we are running.
  Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
         int thread_number,
         const std::string& thread_name_prefix);
  virtual ~Worker();

  // SimpleThread implementation. This actually runs the background thread.
  virtual void Run() OVERRIDE;

  void set_running_task_info(SequenceToken token,
                             WorkerShutdown shutdown_behavior) {
    running_sequence_ = token;
    running_shutdown_behavior_ = shutdown_behavior;
  }

  SequenceToken running_sequence() const {
    return running_sequence_;
  }

  WorkerShutdown running_shutdown_behavior() const {
    return running_shutdown_behavior_;
  }

 private:
  scoped_refptr<SequencedWorkerPool> worker_pool_;
  SequenceToken running_sequence_;
  WorkerShutdown running_shutdown_behavior_;

  DISALLOW_COPY_AND_ASSIGN(Worker);
};

// Inner ----------------------------------------------------------------------

class SequencedWorkerPool::Inner {
 public:
  // Take a raw pointer to |worker| to avoid cycles (since we're owned
  // by it).
  Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
        const std::string& thread_name_prefix,
        TestingObserver* observer);

  ~Inner();

  SequenceToken GetSequenceToken();

  SequenceToken GetNamedSequenceToken(const std::string& name);

  // This function accepts a name and an ID. If the name is null, the
  // token ID is used. This allows us to implement the optional name lookup
  // from a single function without having to enter the lock a separate time.
  bool PostTask(const std::string* optional_token_name,
                SequenceToken sequence_token,
                WorkerShutdown shutdown_behavior,
                const tracked_objects::Location& from_here,
                const Closure& task,
                TimeDelta delay);

  bool RunsTasksOnCurrentThread() const;

  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;

  void CleanupForTesting();

  void SignalHasWorkForTesting();

  int GetWorkSignalCountForTesting() const;

  void Shutdown(int max_blocking_tasks_after_shutdown);

  bool IsShutdownInProgress();

  // Runs the worker loop on the background thread.
  void ThreadLoop(Worker* this_worker);

 private:
  enum GetWorkStatus {
    GET_WORK_FOUND,
    GET_WORK_NOT_FOUND,
    GET_WORK_WAIT,
  };

  enum CleanupState {
    CLEANUP_REQUESTED,
    CLEANUP_STARTING,
    CLEANUP_RUNNING,
    CLEANUP_FINISHING,
    CLEANUP_DONE,
  };

  // Called from within the lock, this converts the given token name into a
  // token ID, creating a new one if necessary.
  int LockedGetNamedTokenID(const std::string& name);

  // Called from within the lock, this returns the next sequence task number.
  int64 LockedGetNextSequenceTaskNumber();

  // Called from within the lock, returns the shutdown behavior of the task
  // running on the currently executing worker thread. If invoked from a thread
  // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
  WorkerShutdown LockedCurrentThreadShutdownBehavior() const;

  // Gets new task. There are 3 cases depending on the return value:
  //
  // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
  //    be run immediately.
  // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
  //    and |task| is not filled in. In this case, the caller should wait until
  //    a task is posted.
  // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
  //    immediately, and |task| is not filled in. Likewise, |wait_time| is
  //    filled in the time to wait until the next task to run. In this case, the
  //    caller should wait the time.
  //
  // In any case, the calling code should clear the given
  // delete_these_outside_lock vector the next time the lock is released.
  // See the implementation for a more detailed description.
  GetWorkStatus GetWork(SequencedTask* task,
                        TimeDelta* wait_time,
                        std::vector<Closure>* delete_these_outside_lock);

  void HandleCleanup();

  // Peforms init and cleanup around running the given task. WillRun...
  // returns the value from PrepareToStartAdditionalThreadIfNecessary.
  // The calling code should call FinishStartingAdditionalThread once the
  // lock is released if the return values is nonzero.
  int WillRunWorkerTask(const SequencedTask& task);
  void DidRunWorkerTask(const SequencedTask& task);

  // Returns true if there are no threads currently running the given
  // sequence token.
  bool IsSequenceTokenRunnable(int sequence_token_id) const;

  // Checks if all threads are busy and the addition of one more could run an
  // additional task waiting in the queue. This must be called from within
  // the lock.
  //
  // If another thread is helpful, this will mark the thread as being in the
  // process of starting and returns the index of the new thread which will be
  // 0 or more. The caller should then call FinishStartingAdditionalThread to
  // complete initialization once the lock is released.
  //
  // If another thread is not necessary, returne 0;
  //
  // See the implementedion for more.
  int PrepareToStartAdditionalThreadIfHelpful();

  // The second part of thread creation after
  // PrepareToStartAdditionalThreadIfHelpful with the thread number it
  // generated. This actually creates the thread and should be called outside
  // the lock to avoid blocking important work starting a thread in the lock.
  void FinishStartingAdditionalThread(int thread_number);

  // Signal |has_work_| and increment |has_work_signal_count_|.
  void SignalHasWork();

  // Checks whether there is work left that's blocking shutdown. Must be
  // called inside the lock.
  bool CanShutdown() const;

  SequencedWorkerPool* const worker_pool_;

  // The last sequence number used. Managed by GetSequenceToken, since this
  // only does threadsafe increment operations, you do not need to hold the
  // lock. This is class-static to make SequenceTokens issued by
  // GetSequenceToken unique across SequencedWorkerPool instances.
  static base::StaticAtomicSequenceNumber g_last_sequence_number_;

  // This lock protects |everything in this class|. Do not read or modify
  // anything without holding this lock. Do not block while holding this
  // lock.
  mutable Lock lock_;

  // Condition variable that is waited on by worker threads until new
  // tasks are posted or shutdown starts.
  ConditionVariable has_work_cv_;

  // Condition variable that is waited on by non-worker threads (in
  // Shutdown()) until CanShutdown() goes to true.
  ConditionVariable can_shutdown_cv_;

  // The maximum number of worker threads we'll create.
  const size_t max_threads_;

  const std::string thread_name_prefix_;

  // Associates all known sequence token names with their IDs.
  std::map<std::string, int> named_sequence_tokens_;

  // Owning pointers to all threads we've created so far, indexed by
  // ID. Since we lazily create threads, this may be less than
  // max_threads_ and will be initially empty.
  typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
  ThreadMap threads_;

  // Set to true when we're in the process of creating another thread.
  // See PrepareToStartAdditionalThreadIfHelpful for more.
  bool thread_being_created_;

  // Number of threads currently waiting for work.
  size_t waiting_thread_count_;

  // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
  // or SKIP_ON_SHUTDOWN flag set.
  size_t blocking_shutdown_thread_count_;

  // A set of all pending tasks in time-to-run order. These are tasks that are
  // either waiting for a thread to run on, waiting for their time to run,
  // or blocked on a previous task in their sequence. We have to iterate over
  // the tasks by time-to-run order, so we use the set instead of the
  // traditional priority_queue.
  typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
  PendingTaskSet pending_tasks_;

  // The next sequence number for a new sequenced task.
  int64 next_sequence_task_number_;

  // Number of tasks in the pending_tasks_ list that are marked as blocking
  // shutdown.
  size_t blocking_shutdown_pending_task_count_;

  // Lists all sequence tokens currently executing.
  std::set<int> current_sequences_;

  // An ID for each posted task to distinguish the task from others in traces.
  int trace_id_;

  // Set when Shutdown is called and no further tasks should be
  // allowed, though we may still be running existing tasks.
  bool shutdown_called_;

  // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
  // has been called.
  int max_blocking_tasks_after_shutdown_;

  // State used to cleanup for testing, all guarded by lock_.
  CleanupState cleanup_state_;
  size_t cleanup_idlers_;
  ConditionVariable cleanup_cv_;

  TestingObserver* const testing_observer_;

  DISALLOW_COPY_AND_ASSIGN(Inner);
};

// Worker definitions ---------------------------------------------------------

SequencedWorkerPool::Worker::Worker(
    const scoped_refptr<SequencedWorkerPool>& worker_pool,
    int thread_number,
    const std::string& prefix)
    : SimpleThread(
          prefix + StringPrintf("Worker%d", thread_number).c_str()),
      worker_pool_(worker_pool),
      running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
  Start();
}

SequencedWorkerPool::Worker::~Worker() {
}

void SequencedWorkerPool::Worker::Run() {
  // Store a pointer to the running sequence in thread local storage for
  // static function access.
  g_lazy_tls_ptr.Get().Set(&running_sequence_);

  // Just jump back to the Inner object to run the thread, since it has all the
  // tracking information and queues. It might be more natural to implement
  // using DelegateSimpleThread and have Inner implement the Delegate to avoid
  // having these worker objects at all, but that method lacks the ability to
  // send thread-specific information easily to the thread loop.
  worker_pool_->inner_->ThreadLoop(this);
  // Release our cyclic reference once we're done.
  worker_pool_ = NULL;
}

// Inner definitions ---------------------------------------------------------

SequencedWorkerPool::Inner::Inner(
    SequencedWorkerPool* worker_pool,
    size_t max_threads,
    const std::string& thread_name_prefix,
    TestingObserver* observer)
    : worker_pool_(worker_pool),
      lock_(),
      has_work_cv_(&lock_),
      can_shutdown_cv_(&lock_),
      max_threads_(max_threads),
      thread_name_prefix_(thread_name_prefix),
      thread_being_created_(false),
      waiting_thread_count_(0),
      blocking_shutdown_thread_count_(0),
      next_sequence_task_number_(0),
      blocking_shutdown_pending_task_count_(0),
      trace_id_(0),
      shutdown_called_(false),
      max_blocking_tasks_after_shutdown_(0),
      cleanup_state_(CLEANUP_DONE),
      cleanup_idlers_(0),
      cleanup_cv_(&lock_),
      testing_observer_(observer) {}

SequencedWorkerPool::Inner::~Inner() {
  // You must call Shutdown() before destroying the pool.
  DCHECK(shutdown_called_);

  // Need to explicitly join with the threads before they're destroyed or else
  // they will be running when our object is half torn down.
  for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
    it->second->Join();
  threads_.clear();

  if (testing_observer_)
    testing_observer_->OnDestruct();
}

SequencedWorkerPool::SequenceToken
SequencedWorkerPool::Inner::GetSequenceToken() {
  // Need to add one because StaticAtomicSequenceNumber starts at zero, which
  // is used as a sentinel value in SequenceTokens.
  return SequenceToken(g_last_sequence_number_.GetNext() + 1);
}

SequencedWorkerPool::SequenceToken
SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
  AutoLock lock(lock_);
  return SequenceToken(LockedGetNamedTokenID(name));
}

bool SequencedWorkerPool::Inner::PostTask(
    const std::string* optional_token_name,
    SequenceToken sequence_token,
    WorkerShutdown shutdown_behavior,
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
  SequencedTask sequenced(from_here);
  sequenced.sequence_token_id = sequence_token.id_;
  sequenced.shutdown_behavior = shutdown_behavior;
  sequenced.posted_from = from_here;
  sequenced.task =
      shutdown_behavior == BLOCK_SHUTDOWN ?
      base::MakeCriticalClosure(task) : task;
  sequenced.time_to_run = TimeTicks::Now() + delay;

  int create_thread_id = 0;
  {
    AutoLock lock(lock_);
    if (shutdown_called_) {
      if (shutdown_behavior != BLOCK_SHUTDOWN ||
          LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
        return false;
      }
      if (max_blocking_tasks_after_shutdown_ <= 0) {
        DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
        return false;
      }
      max_blocking_tasks_after_shutdown_ -= 1;
    }

    // The trace_id is used for identifying the task in about:tracing.
    sequenced.trace_id = trace_id_++;

    TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask",
        TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));

    sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();

    // Now that we have the lock, apply the named token rules.
    if (optional_token_name)
      sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);

    pending_tasks_.insert(sequenced);
    if (shutdown_behavior == BLOCK_SHUTDOWN)
      blocking_shutdown_pending_task_count_++;

    create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
  }

  // Actually start the additional thread or signal an existing one now that
  // we're outside the lock.
  if (create_thread_id)
    FinishStartingAdditionalThread(create_thread_id);
  else
    SignalHasWork();

  return true;
}

bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
  AutoLock lock(lock_);
  return ContainsKey(threads_, PlatformThread::CurrentId());
}

bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
    SequenceToken sequence_token) const {
  AutoLock lock(lock_);
  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
  if (found == threads_.end())
    return false;
  return sequence_token.Equals(found->second->running_sequence());
}

// See https://code.google.com/p/chromium/issues/detail?id=168415
void SequencedWorkerPool::Inner::CleanupForTesting() {
  DCHECK(!RunsTasksOnCurrentThread());
  base::ThreadRestrictions::ScopedAllowWait allow_wait;
  AutoLock lock(lock_);
  CHECK_EQ(CLEANUP_DONE, cleanup_state_);
  if (shutdown_called_)
    return;
  if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
    return;
  cleanup_state_ = CLEANUP_REQUESTED;
  cleanup_idlers_ = 0;
  has_work_cv_.Signal();
  while (cleanup_state_ != CLEANUP_DONE)
    cleanup_cv_.Wait();
}

void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
  SignalHasWork();
}

void SequencedWorkerPool::Inner::Shutdown(
    int max_new_blocking_tasks_after_shutdown) {
  DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
  {
    AutoLock lock(lock_);
    // Cleanup and Shutdown should not be called concurrently.
    CHECK_EQ(CLEANUP_DONE, cleanup_state_);
    if (shutdown_called_)
      return;
    shutdown_called_ = true;
    max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;

    // Tickle the threads. This will wake up a waiting one so it will know that
    // it can exit, which in turn will wake up any other waiting ones.
    SignalHasWork();

    // There are no pending or running tasks blocking shutdown, we're done.
    if (CanShutdown())
      return;
  }

  // If we're here, then something is blocking shutdown.  So wait for
  // CanShutdown() to go to true.

  if (testing_observer_)
    testing_observer_->WillWaitForShutdown();

#if !defined(OS_NACL)
  TimeTicks shutdown_wait_begin = TimeTicks::Now();
#endif

  {
    base::ThreadRestrictions::ScopedAllowWait allow_wait;
    AutoLock lock(lock_);
    while (!CanShutdown())
      can_shutdown_cv_.Wait();
  }
#if !defined(OS_NACL)
  UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
                      TimeTicks::Now() - shutdown_wait_begin);
#endif
}

bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
    AutoLock lock(lock_);
    return shutdown_called_;
}

void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
  {
    AutoLock lock(lock_);
    DCHECK(thread_being_created_);
    thread_being_created_ = false;
    std::pair<ThreadMap::iterator, bool> result =
        threads_.insert(
            std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
    DCHECK(result.second);

    while (true) {
#if defined(OS_MACOSX)
      base::mac::ScopedNSAutoreleasePool autorelease_pool;
#endif

      HandleCleanup();

      // See GetWork for what delete_these_outside_lock is doing.
      SequencedTask task;
      TimeDelta wait_time;
      std::vector<Closure> delete_these_outside_lock;
      GetWorkStatus status =
          GetWork(&task, &wait_time, &delete_these_outside_lock);
      if (status == GET_WORK_FOUND) {
        TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask",
            TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
        TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop",
                     "src_file", task.posted_from.file_name(),
                     "src_func", task.posted_from.function_name());
        int new_thread_id = WillRunWorkerTask(task);
        {
          AutoUnlock unlock(lock_);
          // There may be more work available, so wake up another
          // worker thread. (Technically not required, since we
          // already get a signal for each new task, but it doesn't
          // hurt.)
          SignalHasWork();
          delete_these_outside_lock.clear();

          // Complete thread creation outside the lock if necessary.
          if (new_thread_id)
            FinishStartingAdditionalThread(new_thread_id);

          this_worker->set_running_task_info(
              SequenceToken(task.sequence_token_id), task.shutdown_behavior);

          tracked_objects::TrackedTime start_time =
              tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);

          task.task.Run();

          tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
              start_time, tracked_objects::ThreadData::NowForEndOfRun());

          // Make sure our task is erased outside the lock for the
          // same reason we do this with delete_these_oustide_lock.
          // Also, do it before calling set_running_task_info() so
          // that sequence-checking from within the task's destructor
          // still works.
          task.task = Closure();

          this_worker->set_running_task_info(
              SequenceToken(), CONTINUE_ON_SHUTDOWN);
        }
        DidRunWorkerTask(task);  // Must be done inside the lock.
      } else if (cleanup_state_ == CLEANUP_RUNNING) {
        switch (status) {
          case GET_WORK_WAIT: {
              AutoUnlock unlock(lock_);
              delete_these_outside_lock.clear();
            }
            break;
          case GET_WORK_NOT_FOUND:
            CHECK(delete_these_outside_lock.empty());
            cleanup_state_ = CLEANUP_FINISHING;
            cleanup_cv_.Broadcast();
            break;
          default:
            NOTREACHED();
        }
      } else {
        // When we're terminating and there's no more work, we can
        // shut down, other workers can complete any pending or new tasks.
        // We can get additional tasks posted after shutdown_called_ is set
        // but only worker threads are allowed to post tasks at that time, and
        // the workers responsible for posting those tasks will be available
        // to run them. Also, there may be some tasks stuck behind running
        // ones with the same sequence token, but additional threads won't
        // help this case.
        if (shutdown_called_ &&
            blocking_shutdown_pending_task_count_ == 0)
          break;
        waiting_thread_count_++;

        switch (status) {
          case GET_WORK_NOT_FOUND:
            has_work_cv_.Wait();
            break;
          case GET_WORK_WAIT:
            has_work_cv_.TimedWait(wait_time);
            break;
          default:
            NOTREACHED();
        }
        waiting_thread_count_--;
      }
    }
  }  // Release lock_.

  // We noticed we should exit. Wake up the next worker so it knows it should
  // exit as well (because the Shutdown() code only signals once).
  SignalHasWork();

  // Possibly unblock shutdown.
  can_shutdown_cv_.Signal();
}

void SequencedWorkerPool::Inner::HandleCleanup() {
  lock_.AssertAcquired();
  if (cleanup_state_ == CLEANUP_DONE)
    return;
  if (cleanup_state_ == CLEANUP_REQUESTED) {
    // We win, we get to do the cleanup as soon as the others wise up and idle.
    cleanup_state_ = CLEANUP_STARTING;
    while (thread_being_created_ ||
           cleanup_idlers_ != threads_.size() - 1) {
      has_work_cv_.Signal();
      cleanup_cv_.Wait();
    }
    cleanup_state_ = CLEANUP_RUNNING;
    return;
  }
  if (cleanup_state_ == CLEANUP_STARTING) {
    // Another worker thread is cleaning up, we idle here until thats done.
    ++cleanup_idlers_;
    cleanup_cv_.Broadcast();
    while (cleanup_state_ != CLEANUP_FINISHING) {
      cleanup_cv_.Wait();
    }
    --cleanup_idlers_;
    cleanup_cv_.Broadcast();
    return;
  }
  if (cleanup_state_ == CLEANUP_FINISHING) {
    // We wait for all idlers to wake up prior to being DONE.
    while (cleanup_idlers_ != 0) {
      cleanup_cv_.Broadcast();
      cleanup_cv_.Wait();
    }
    if (cleanup_state_ == CLEANUP_FINISHING) {
      cleanup_state_ = CLEANUP_DONE;
      cleanup_cv_.Signal();
    }
    return;
  }
}

int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
    const std::string& name) {
  lock_.AssertAcquired();
  DCHECK(!name.empty());

  std::map<std::string, int>::const_iterator found =
      named_sequence_tokens_.find(name);
  if (found != named_sequence_tokens_.end())
    return found->second;  // Got an existing one.

  // Create a new one for this name.
  SequenceToken result = GetSequenceToken();
  named_sequence_tokens_.insert(std::make_pair(name, result.id_));
  return result.id_;
}

int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
  lock_.AssertAcquired();
  // We assume that we never create enough tasks to wrap around.
  return next_sequence_task_number_++;
}

SequencedWorkerPool::WorkerShutdown
SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
  lock_.AssertAcquired();
  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
  if (found == threads_.end())
    return CONTINUE_ON_SHUTDOWN;
  return found->second->running_shutdown_behavior();
}

SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
    SequencedTask* task,
    TimeDelta* wait_time,
    std::vector<Closure>* delete_these_outside_lock) {
  lock_.AssertAcquired();

#if !defined(OS_NACL)
  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
                           static_cast<int>(pending_tasks_.size()));
#endif

  // Find the next task with a sequence token that's not currently in use.
  // If the token is in use, that means another thread is running something
  // in that sequence, and we can't run it without going out-of-order.
  //
  // This algorithm is simple and fair, but inefficient in some cases. For
  // example, say somebody schedules 1000 slow tasks with the same sequence
  // number. We'll have to go through all those tasks each time we feel like
  // there might be work to schedule. If this proves to be a problem, we
  // should make this more efficient.
  //
  // One possible enhancement would be to keep a map from sequence ID to a
  // list of pending but currently blocked SequencedTasks for that ID.
  // When a worker finishes a task of one sequence token, it can pick up the
  // next one from that token right away.
  //
  // This may lead to starvation if there are sufficient numbers of sequences
  // in use. To alleviate this, we could add an incrementing priority counter
  // to each SequencedTask. Then maintain a priority_queue of all runnable
  // tasks, sorted by priority counter. When a sequenced task is completed
  // we would pop the head element off of that tasks pending list and add it
  // to the priority queue. Then we would run the first item in the priority
  // queue.

  GetWorkStatus status = GET_WORK_NOT_FOUND;
  int unrunnable_tasks = 0;
  PendingTaskSet::iterator i = pending_tasks_.begin();
  // We assume that the loop below doesn't take too long and so we can just do
  // a single call to TimeTicks::Now().
  const TimeTicks current_time = TimeTicks::Now();
  while (i != pending_tasks_.end()) {
    if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
      unrunnable_tasks++;
      ++i;
      continue;
    }

    if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
      // We're shutting down and the task we just found isn't blocking
      // shutdown. Delete it and get more work.
      //
      // Note that we do not want to delete unrunnable tasks. Deleting a task
      // can have side effects (like freeing some objects) and deleting a
      // task that's supposed to run after one that's currently running could
      // cause an obscure crash.
      //
      // We really want to delete these tasks outside the lock in case the
      // closures are holding refs to objects that want to post work from
      // their destructorss (which would deadlock). The closures are
      // internally refcounted, so we just need to keep a copy of them alive
      // until the lock is exited. The calling code can just clear() the
      // vector they passed to us once the lock is exited to make this
      // happen.
      delete_these_outside_lock->push_back(i->task);
      pending_tasks_.erase(i++);
      continue;
    }

    if (i->time_to_run > current_time) {
      // The time to run has not come yet.
      *wait_time = i->time_to_run - current_time;
      status = GET_WORK_WAIT;
      if (cleanup_state_ == CLEANUP_RUNNING) {
        // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
        delete_these_outside_lock->push_back(i->task);
        pending_tasks_.erase(i);
      }
      break;
    }

    // Found a runnable task.
    *task = *i;
    pending_tasks_.erase(i);
    if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
      blocking_shutdown_pending_task_count_--;
    }

    status = GET_WORK_FOUND;
    break;
  }

  // Track the number of tasks we had to skip over to see if we should be
  // making this more efficient. If this number ever becomes large or is
  // frequently "some", we should consider the optimization above.
#if !defined(OS_NACL)
  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
                           unrunnable_tasks);
#endif
  return status;
}

int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
  lock_.AssertAcquired();

  // Mark the task's sequence number as in use.
  if (task.sequence_token_id)
    current_sequences_.insert(task.sequence_token_id);

  // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
  // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
  // completes.
  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
    blocking_shutdown_thread_count_++;

  // We just picked up a task. Since StartAdditionalThreadIfHelpful only
  // creates a new thread if there is no free one, there is a race when posting
  // tasks that many tasks could have been posted before a thread started
  // running them, so only one thread would have been created. So we also check
  // whether we should create more threads after removing our task from the
  // queue, which also has the nice side effect of creating the workers from
  // background threads rather than the main thread of the app.
  //
  // If another thread wasn't created, we want to wake up an existing thread
  // if there is one waiting to pick up the next task.
  //
  // Note that we really need to do this *before* running the task, not
  // after. Otherwise, if more than one task is posted, the creation of the
  // second thread (since we only create one at a time) will be blocked by
  // the execution of the first task, which could be arbitrarily long.
  return PrepareToStartAdditionalThreadIfHelpful();
}

void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
  lock_.AssertAcquired();

  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
    DCHECK_GT(blocking_shutdown_thread_count_, 0u);
    blocking_shutdown_thread_count_--;
  }

  if (task.sequence_token_id)
    current_sequences_.erase(task.sequence_token_id);
}

bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
    int sequence_token_id) const {
  lock_.AssertAcquired();
  return !sequence_token_id ||
      current_sequences_.find(sequence_token_id) ==
          current_sequences_.end();
}

int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
  lock_.AssertAcquired();
  // How thread creation works:
  //
  // We'de like to avoid creating threads with the lock held. However, we
  // need to be sure that we have an accurate accounting of the threads for
  // proper Joining and deltion on shutdown.
  //
  // We need to figure out if we need another thread with the lock held, which
  // is what this function does. It then marks us as in the process of creating
  // a thread. When we do shutdown, we wait until the thread_being_created_
  // flag is cleared, which ensures that the new thread is properly added to
  // all the data structures and we can't leak it. Once shutdown starts, we'll
  // refuse to create more threads or they would be leaked.
  //
  // Note that this creates a mostly benign race condition on shutdown that
  // will cause fewer workers to be created than one would expect. It isn't
  // much of an issue in real life, but affects some tests. Since we only spawn
  // one worker at a time, the following sequence of events can happen:
  //
  //  1. Main thread posts a bunch of unrelated tasks that would normally be
  //     run on separate threads.
  //  2. The first task post causes us to start a worker. Other tasks do not
  //     cause a worker to start since one is pending.
  //  3. Main thread initiates shutdown.
  //  4. No more threads are created since the shutdown_called_ flag is set.
  //
  // The result is that one may expect that max_threads_ workers to be created
  // given the workload, but in reality fewer may be created because the
  // sequence of thread creation on the background threads is racing with the
  // shutdown call.
  if (!shutdown_called_ &&
      !thread_being_created_ &&
      cleanup_state_ == CLEANUP_DONE &&
      threads_.size() < max_threads_ &&
      waiting_thread_count_ == 0) {
    // We could use an additional thread if there's work to be done.
    for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
         i != pending_tasks_.end(); ++i) {
      if (IsSequenceTokenRunnable(i->sequence_token_id)) {
        // Found a runnable task, mark the thread as being started.
        thread_being_created_ = true;
        return static_cast<int>(threads_.size() + 1);
      }
    }
  }
  return 0;
}

void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
    int thread_number) {
  // Called outside of the lock.
  DCHECK(thread_number > 0);

  // The worker is assigned to the list when the thread actually starts, which
  // will manage the memory of the pointer.
  new Worker(worker_pool_, thread_number, thread_name_prefix_);
}

void SequencedWorkerPool::Inner::SignalHasWork() {
  has_work_cv_.Signal();
  if (testing_observer_) {
    testing_observer_->OnHasWork();
  }
}

bool SequencedWorkerPool::Inner::CanShutdown() const {
  lock_.AssertAcquired();
  // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
  return !thread_being_created_ &&
         blocking_shutdown_thread_count_ == 0 &&
         blocking_shutdown_pending_task_count_ == 0;
}

base::StaticAtomicSequenceNumber
SequencedWorkerPool::Inner::g_last_sequence_number_;

// SequencedWorkerPool --------------------------------------------------------

// static
SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
  // Don't construct lazy instance on check.
  if (g_lazy_tls_ptr == NULL)
    return SequenceToken();

  SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
  if (!token)
    return SequenceToken();
  return *token;
}

SequencedWorkerPool::SequencedWorkerPool(
    size_t max_threads,
    const std::string& thread_name_prefix)
    : constructor_message_loop_(MessageLoopProxy::current()),
      inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
}

SequencedWorkerPool::SequencedWorkerPool(
    size_t max_threads,
    const std::string& thread_name_prefix,
    TestingObserver* observer)
    : constructor_message_loop_(MessageLoopProxy::current()),
      inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
}

SequencedWorkerPool::~SequencedWorkerPool() {}

void SequencedWorkerPool::OnDestruct() const {
  DCHECK(constructor_message_loop_.get());
  // Avoid deleting ourselves on a worker thread (which would
  // deadlock).
  if (RunsTasksOnCurrentThread()) {
    constructor_message_loop_->DeleteSoon(FROM_HERE, this);
  } else {
    delete this;
  }
}

SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
  return inner_->GetSequenceToken();
}

SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
    const std::string& name) {
  return inner_->GetNamedSequenceToken(name);
}

scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
    SequenceToken token) {
  return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
}

scoped_refptr<SequencedTaskRunner>
SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
    SequenceToken token, WorkerShutdown shutdown_behavior) {
  return new SequencedWorkerPoolSequencedTaskRunner(
      this, token, shutdown_behavior);
}

scoped_refptr<TaskRunner>
SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
    WorkerShutdown shutdown_behavior) {
  return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
}

bool SequencedWorkerPool::PostWorkerTask(
    const tracked_objects::Location& from_here,
    const Closure& task) {
  return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
                          from_here, task, TimeDelta());
}

bool SequencedWorkerPool::PostDelayedWorkerTask(
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  WorkerShutdown shutdown_behavior =
      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
                          from_here, task, delay);
}

bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
    const tracked_objects::Location& from_here,
    const Closure& task,
    WorkerShutdown shutdown_behavior) {
  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
                          from_here, task, TimeDelta());
}

bool SequencedWorkerPool::PostSequencedWorkerTask(
    SequenceToken sequence_token,
    const tracked_objects::Location& from_here,
    const Closure& task) {
  return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
                          from_here, task, TimeDelta());
}

bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
    SequenceToken sequence_token,
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  WorkerShutdown shutdown_behavior =
      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
                          from_here, task, delay);
}

bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
    const std::string& token_name,
    const tracked_objects::Location& from_here,
    const Closure& task) {
  DCHECK(!token_name.empty());
  return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
                          from_here, task, TimeDelta());
}

bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
    SequenceToken sequence_token,
    const tracked_objects::Location& from_here,
    const Closure& task,
    WorkerShutdown shutdown_behavior) {
  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
                          from_here, task, TimeDelta());
}

bool SequencedWorkerPool::PostDelayedTask(
    const tracked_objects::Location& from_here,
    const Closure& task,
    TimeDelta delay) {
  return PostDelayedWorkerTask(from_here, task, delay);
}

bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
  return inner_->RunsTasksOnCurrentThread();
}

bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
    SequenceToken sequence_token) const {
  return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
}

void SequencedWorkerPool::FlushForTesting() {
  inner_->CleanupForTesting();
}

void SequencedWorkerPool::SignalHasWorkForTesting() {
  inner_->SignalHasWorkForTesting();
}

void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
  DCHECK(constructor_message_loop_->BelongsToCurrentThread());
  inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
}

bool SequencedWorkerPool::IsShutdownInProgress() {
  return inner_->IsShutdownInProgress();
}

}  // namespace base