普通文本  |  248行  |  6.89 KB

// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "remoting/base/plugin_thread_task_runner.h"

#include "base/bind.h"

namespace {

base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
  return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
}

}  // namespace

namespace remoting {

PluginThreadTaskRunner::Delegate::~Delegate() {
}

PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
    : plugin_thread_id_(base::PlatformThread::CurrentId()),
      event_(false, false),
      delegate_(delegate),
      next_sequence_num_(0),
      quit_received_(false),
      stopped_(false) {
}

PluginThreadTaskRunner::~PluginThreadTaskRunner() {
  DCHECK(delegate_ == NULL);
  DCHECK(stopped_);
}

void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
  DCHECK(BelongsToCurrentThread());

  // Detach from the plugin thread and redirect all tasks posted after this
  // point to the shutdown task loop.
  {
    base::AutoLock auto_lock(lock_);

    DCHECK(delegate_ != NULL);
    DCHECK(!stopped_);

    delegate_ = NULL;
    stopped_ = quit_received_;
  }

  // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
  // timers are cancelled. It is OK to clear |scheduled_timers_| even if
  // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
  // called before NPP_Destroy()).
  scheduled_timers_.clear();

  // Run all tasks that are due.
  ProcessIncomingTasks();
  RunDueTasks(base::TimeTicks::Now());

  while (!stopped_) {
    if (delayed_queue_.empty()) {
      event_.Wait();
    } else {
      event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
    }

    // Run all tasks that are due.
    ProcessIncomingTasks();
    RunDueTasks(base::TimeTicks::Now());

    base::AutoLock auto_lock(lock_);
    stopped_ = quit_received_;
  }
}

void PluginThreadTaskRunner::Quit() {
  base::AutoLock auto_lock(lock_);

  if (!quit_received_) {
    quit_received_ = true;
    event_.Signal();
  }
}

bool PluginThreadTaskRunner::PostDelayedTask(
    const tracked_objects::Location& from_here,
    const base::Closure& task,
    base::TimeDelta delay) {

  // Wrap the task into |base::PendingTask|.
  base::TimeTicks delayed_run_time;
  if (delay > base::TimeDelta()) {
    delayed_run_time = base::TimeTicks::Now() + delay;
  } else {
    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
  }

  base::PendingTask pending_task(from_here, task, delayed_run_time, false);

  // Push the task to the incoming queue.
  base::AutoLock locked(lock_);

  // Initialize the sequence number. The sequence number provides FIFO ordering
  // for tasks with the same |delayed_run_time|.
  pending_task.sequence_num = next_sequence_num_++;

  // Post an asynchronous call on the plugin thread to process the task.
  if (incoming_queue_.empty()) {
    PostRunTasks();
  }

  incoming_queue_.push(pending_task);
  pending_task.task.Reset();

  // No tasks should be posted after Quit() has been called.
  DCHECK(!quit_received_);
  return true;
}

bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
    const tracked_objects::Location& from_here,
    const base::Closure& task,
    base::TimeDelta delay) {
  // All tasks running on this task loop are non-nestable.
  return PostDelayedTask(from_here, task, delay);
}

bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
  // In pepper plugins ideally we should use pp::Core::IsMainThread,
  // but it is problematic because we would need to keep reference to
  // Core somewhere, e.g. make the delegate ref-counted.
  return base::PlatformThread::CurrentId() == plugin_thread_id_;
}

void PluginThreadTaskRunner::PostRunTasks() {
  // Post tasks to the plugin thread when it is availabe or spin the shutdown
  // task loop.
  if (delegate_ != NULL) {
    base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
    delegate_->RunOnPluginThread(
        base::TimeDelta(),
        &PluginThreadTaskRunner::TaskSpringboard,
        new base::Closure(closure));
  } else {
    event_.Signal();
  }
}

void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
  DCHECK(BelongsToCurrentThread());

  // |delegate_| is updated from the plugin thread only, so it is safe to access
  // it here without taking the lock.
  if (delegate_ != NULL) {
    // Schedule RunDelayedTasks() to be called at |when| if it hasn't been
    // scheduled already.
    if (scheduled_timers_.insert(when).second) {
      base::TimeDelta delay = CalcTimeDelta(when);
      base::Closure closure =
          base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
      delegate_->RunOnPluginThread(
          delay,
          &PluginThreadTaskRunner::TaskSpringboard,
          new base::Closure(closure));
    }
  } else {
    // Spin the shutdown loop if the task runner has already been detached.
    // The shutdown loop will pick the tasks to run itself.
    event_.Signal();
  }
}

void PluginThreadTaskRunner::ProcessIncomingTasks() {
  DCHECK(BelongsToCurrentThread());

  // Grab all unsorted tasks accomulated so far.
  base::TaskQueue work_queue;
  {
    base::AutoLock locked(lock_);
    incoming_queue_.Swap(&work_queue);
  }

  while (!work_queue.empty()) {
    base::PendingTask pending_task = work_queue.front();
    work_queue.pop();

    if (pending_task.delayed_run_time.is_null()) {
      pending_task.task.Run();
    } else {
      delayed_queue_.push(pending_task);
    }
  }
}

void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
  DCHECK(BelongsToCurrentThread());

  scheduled_timers_.erase(when);

  // |stopped_| is updated by the plugin thread only, so it is safe to access
  // it here without taking the lock.
  if (!stopped_) {
    ProcessIncomingTasks();
    RunDueTasks(base::TimeTicks::Now());
  }
}

void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
  DCHECK(BelongsToCurrentThread());

  // Run all due tasks.
  while (!delayed_queue_.empty() &&
         delayed_queue_.top().delayed_run_time <= now) {
    delayed_queue_.top().task.Run();
    delayed_queue_.pop();
  }

  // Post a delayed asynchronous call to the plugin thread to process tasks from
  // the delayed queue.
  if (!delayed_queue_.empty()) {
    base::TimeTicks when = delayed_queue_.top().delayed_run_time;
    if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
      PostDelayedRunTasks(when);
    }
  }
}

void PluginThreadTaskRunner::RunTasks() {
  DCHECK(BelongsToCurrentThread());

  // |stopped_| is updated by the plugin thread only, so it is safe to access
  // it here without taking the lock.
  if (!stopped_) {
    ProcessIncomingTasks();
    RunDueTasks(base::TimeTicks::Now());
  }
}

// static
void PluginThreadTaskRunner::TaskSpringboard(void* data) {
  base::Closure* task = reinterpret_cast<base::Closure*>(data);
  task->Run();
  delete task;
}

}  // namespace remoting