普通文本  |  255行  |  7.32 KB

/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "perfetto/base/unix_task_runner.h"

#include "perfetto/base/build_config.h"

#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>

#include <limits>

namespace perfetto {
namespace base {

UnixTaskRunner::UnixTaskRunner() {
  // Create a self-pipe which is used to wake up the main thread from inside
  // poll(2).
  int pipe_fds[2];
  PERFETTO_CHECK(pipe(pipe_fds) == 0);

  // Make the pipe non-blocking so that we never block the waking thread (either
  // the main thread or another one) when scheduling a wake-up.
  for (auto fd : pipe_fds) {
    int flags = fcntl(fd, F_GETFL, 0);
    PERFETTO_CHECK(flags != -1);
    PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
    PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
  }
  control_read_.reset(pipe_fds[0]);
  control_write_.reset(pipe_fds[1]);

#if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX)
  // We are never expecting to have more than a few bytes in the wake-up pipe.
  // Reduce the buffer size on Linux. Note that this gets rounded up to the page
  // size.
  PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
#endif

  AddFileDescriptorWatch(control_read_.get(), [] {
    // Not reached -- see PostFileDescriptorWatches().
    PERFETTO_DCHECK(false);
  });
}

UnixTaskRunner::~UnixTaskRunner() = default;

void UnixTaskRunner::WakeUp() {
  const char dummy = 'P';
  if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
    PERFETTO_DPLOG("write()");
}

void UnixTaskRunner::Run() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  quit_ = false;
  while (true) {
    int poll_timeout_ms;
    {
      std::lock_guard<std::mutex> lock(lock_);
      if (quit_)
        return;
      poll_timeout_ms = GetDelayMsToNextTaskLocked();
      UpdateWatchTasksLocked();
    }
    int ret = PERFETTO_EINTR(poll(
        &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
    PERFETTO_CHECK(ret >= 0);

    // To avoid starvation we always interleave all types of tasks -- immediate,
    // delayed and file descriptor watches.
    PostFileDescriptorWatches();
    RunImmediateAndDelayedTask();
  }
}

void UnixTaskRunner::Quit() {
  {
    std::lock_guard<std::mutex> lock(lock_);
    quit_ = true;
  }
  WakeUp();
}

bool UnixTaskRunner::IsIdleForTesting() {
  std::lock_guard<std::mutex> lock(lock_);
  return immediate_tasks_.empty();
}

void UnixTaskRunner::UpdateWatchTasksLocked() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  if (!watch_tasks_changed_)
    return;
  watch_tasks_changed_ = false;
  poll_fds_.clear();
  for (auto& it : watch_tasks_) {
    it.second.poll_fd_index = poll_fds_.size();
    poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
  }
}

void UnixTaskRunner::RunImmediateAndDelayedTask() {
  // TODO(skyostil): Add a separate work queue in case in case locking overhead
  // becomes an issue.
  std::function<void()> immediate_task;
  std::function<void()> delayed_task;
  TimeMillis now = GetWallTimeMs();
  {
    std::lock_guard<std::mutex> lock(lock_);
    if (!immediate_tasks_.empty()) {
      immediate_task = std::move(immediate_tasks_.front());
      immediate_tasks_.pop_front();
    }
    if (!delayed_tasks_.empty()) {
      auto it = delayed_tasks_.begin();
      if (now >= it->first) {
        delayed_task = std::move(it->second);
        delayed_tasks_.erase(it);
      }
    }
  }

  errno = 0;
  if (immediate_task)
    RunTask(immediate_task);
  errno = 0;
  if (delayed_task)
    RunTask(delayed_task);
}

void UnixTaskRunner::PostFileDescriptorWatches() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  for (size_t i = 0; i < poll_fds_.size(); i++) {
    if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
      continue;
    poll_fds_[i].revents = 0;

    // The wake-up event is handled inline to avoid an infinite recursion of
    // posted tasks.
    if (poll_fds_[i].fd == control_read_.get()) {
      // Drain the byte(s) written to the wake-up pipe. We can potentially read
      // more than one byte if several wake-ups have been scheduled.
      char buffer[16];
      if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
          errno != EAGAIN) {
        PERFETTO_DPLOG("read()");
      }
      continue;
    }

    // Binding to |this| is safe since we are the only object executing the
    // task.
    PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
                       poll_fds_[i].fd));

    // Make the fd negative while a posted task is pending. This makes poll(2)
    // ignore the fd.
    PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
    poll_fds_[i].fd = -poll_fds_[i].fd;
  }
}

void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
  std::function<void()> task;
  {
    std::lock_guard<std::mutex> lock(lock_);
    auto it = watch_tasks_.find(fd);
    if (it == watch_tasks_.end())
      return;
    // Make poll(2) pay attention to the fd again. Since another thread may have
    // updated this watch we need to refresh the set first.
    UpdateWatchTasksLocked();
    size_t fd_index = it->second.poll_fd_index;
    PERFETTO_DCHECK(fd_index < poll_fds_.size());
    PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
    poll_fds_[fd_index].fd = fd;
    task = it->second.callback;
  }
  errno = 0;
  RunTask(task);
}

int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  if (!immediate_tasks_.empty())
    return 0;
  if (!delayed_tasks_.empty()) {
    TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
    return std::max(0, static_cast<int>(diff.count()));
  }
  return -1;
}

void UnixTaskRunner::PostTask(std::function<void()> task) {
  bool was_empty;
  {
    std::lock_guard<std::mutex> lock(lock_);
    was_empty = immediate_tasks_.empty();
    immediate_tasks_.push_back(std::move(task));
  }
  if (was_empty)
    WakeUp();
}

void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
                                     uint32_t delay_ms) {
  TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
  {
    std::lock_guard<std::mutex> lock(lock_);
    delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
  }
  WakeUp();
}

void UnixTaskRunner::AddFileDescriptorWatch(int fd,
                                            std::function<void()> task) {
  PERFETTO_DCHECK(fd >= 0);
  {
    std::lock_guard<std::mutex> lock(lock_);
    PERFETTO_DCHECK(!watch_tasks_.count(fd));
    watch_tasks_[fd] = {std::move(task), SIZE_MAX};
    watch_tasks_changed_ = true;
  }
  WakeUp();
}

void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
  PERFETTO_DCHECK(fd >= 0);
  {
    std::lock_guard<std::mutex> lock(lock_);
    PERFETTO_DCHECK(watch_tasks_.count(fd));
    watch_tasks_.erase(fd);
    watch_tasks_changed_ = true;
  }
  // No need to schedule a wake-up for this.
}

}  // namespace base
}  // namespace perfetto