// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/base/plugin_thread_task_runner.h"
#include "base/bind.h"
namespace {
base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
}
} // namespace
namespace remoting {
PluginThreadTaskRunner::Delegate::~Delegate() {
}
PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
: plugin_thread_id_(base::PlatformThread::CurrentId()),
event_(false, false),
delegate_(delegate),
next_sequence_num_(0),
quit_received_(false),
stopped_(false) {
}
PluginThreadTaskRunner::~PluginThreadTaskRunner() {
DCHECK(delegate_ == NULL);
DCHECK(stopped_);
}
void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
DCHECK(BelongsToCurrentThread());
// Detach from the plugin thread and redirect all tasks posted after this
// point to the shutdown task loop.
{
base::AutoLock auto_lock(lock_);
DCHECK(delegate_ != NULL);
DCHECK(!stopped_);
delegate_ = NULL;
stopped_ = quit_received_;
}
// When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
// timers are cancelled. It is OK to clear |scheduled_timers_| even if
// the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
// called before NPP_Destroy()).
scheduled_timers_.clear();
// Run all tasks that are due.
ProcessIncomingTasks();
RunDueTasks(base::TimeTicks::Now());
while (!stopped_) {
if (delayed_queue_.empty()) {
event_.Wait();
} else {
event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
}
// Run all tasks that are due.
ProcessIncomingTasks();
RunDueTasks(base::TimeTicks::Now());
base::AutoLock auto_lock(lock_);
stopped_ = quit_received_;
}
}
void PluginThreadTaskRunner::Quit() {
base::AutoLock auto_lock(lock_);
if (!quit_received_) {
quit_received_ = true;
event_.Signal();
}
}
bool PluginThreadTaskRunner::PostDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
// Wrap the task into |base::PendingTask|.
base::TimeTicks delayed_run_time;
if (delay > base::TimeDelta()) {
delayed_run_time = base::TimeTicks::Now() + delay;
} else {
DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
}
base::PendingTask pending_task(from_here, task, delayed_run_time, false);
// Push the task to the incoming queue.
base::AutoLock locked(lock_);
// Initialize the sequence number. The sequence number provides FIFO ordering
// for tasks with the same |delayed_run_time|.
pending_task.sequence_num = next_sequence_num_++;
// Post an asynchronous call on the plugin thread to process the task.
if (incoming_queue_.empty()) {
PostRunTasks();
}
incoming_queue_.push(pending_task);
pending_task.task.Reset();
// No tasks should be posted after Quit() has been called.
DCHECK(!quit_received_);
return true;
}
bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
// All tasks running on this task loop are non-nestable.
return PostDelayedTask(from_here, task, delay);
}
bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
// In pepper plugins ideally we should use pp::Core::IsMainThread,
// but it is problematic because we would need to keep reference to
// Core somewhere, e.g. make the delegate ref-counted.
return base::PlatformThread::CurrentId() == plugin_thread_id_;
}
void PluginThreadTaskRunner::PostRunTasks() {
// Post tasks to the plugin thread when it is availabe or spin the shutdown
// task loop.
if (delegate_ != NULL) {
base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
delegate_->RunOnPluginThread(
base::TimeDelta(),
&PluginThreadTaskRunner::TaskSpringboard,
new base::Closure(closure));
} else {
event_.Signal();
}
}
void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
DCHECK(BelongsToCurrentThread());
// |delegate_| is updated from the plugin thread only, so it is safe to access
// it here without taking the lock.
if (delegate_ != NULL) {
// Schedule RunDelayedTasks() to be called at |when| if it hasn't been
// scheduled already.
if (scheduled_timers_.insert(when).second) {
base::TimeDelta delay = CalcTimeDelta(when);
base::Closure closure =
base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
delegate_->RunOnPluginThread(
delay,
&PluginThreadTaskRunner::TaskSpringboard,
new base::Closure(closure));
}
} else {
// Spin the shutdown loop if the task runner has already been detached.
// The shutdown loop will pick the tasks to run itself.
event_.Signal();
}
}
void PluginThreadTaskRunner::ProcessIncomingTasks() {
DCHECK(BelongsToCurrentThread());
// Grab all unsorted tasks accomulated so far.
base::TaskQueue work_queue;
{
base::AutoLock locked(lock_);
incoming_queue_.Swap(&work_queue);
}
while (!work_queue.empty()) {
base::PendingTask pending_task = work_queue.front();
work_queue.pop();
if (pending_task.delayed_run_time.is_null()) {
pending_task.task.Run();
} else {
delayed_queue_.push(pending_task);
}
}
}
void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
DCHECK(BelongsToCurrentThread());
scheduled_timers_.erase(when);
// |stopped_| is updated by the plugin thread only, so it is safe to access
// it here without taking the lock.
if (!stopped_) {
ProcessIncomingTasks();
RunDueTasks(base::TimeTicks::Now());
}
}
void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
DCHECK(BelongsToCurrentThread());
// Run all due tasks.
while (!delayed_queue_.empty() &&
delayed_queue_.top().delayed_run_time <= now) {
delayed_queue_.top().task.Run();
delayed_queue_.pop();
}
// Post a delayed asynchronous call to the plugin thread to process tasks from
// the delayed queue.
if (!delayed_queue_.empty()) {
base::TimeTicks when = delayed_queue_.top().delayed_run_time;
if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
PostDelayedRunTasks(when);
}
}
}
void PluginThreadTaskRunner::RunTasks() {
DCHECK(BelongsToCurrentThread());
// |stopped_| is updated by the plugin thread only, so it is safe to access
// it here without taking the lock.
if (!stopped_) {
ProcessIncomingTasks();
RunDueTasks(base::TimeTicks::Now());
}
}
// static
void PluginThreadTaskRunner::TaskSpringboard(void* data) {
base::Closure* task = reinterpret_cast<base::Closure*>(data);
task->Run();
delete task;
}
} // namespace remoting