普通文本  |  108行  |  2.92 KB

// Copyright 2015 The Weave Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "examples/provider/event_task_runner.h"

#include <signal.h>

namespace weave {
namespace examples {

namespace {
event_base* g_event_base = nullptr;
}

void EventTaskRunner::PostDelayedTask(
    const tracked_objects::Location& from_here,
    const base::Closure& task,
    base::TimeDelta delay) {
  base::Time new_time = base::Time::Now() + delay;
  if (queue_.empty() || new_time < queue_.top().first.first) {
    ReScheduleEvent(delay);
  }
  queue_.emplace(std::make_pair(new_time, ++counter_), task);
}

void EventTaskRunner::AddIoCompletionTask(
    int fd,
    int16_t what,
    const EventTaskRunner::IoCompletionCallback& task) {
  int16_t flags = EV_PERSIST | EV_ET;
  flags |= (what & kReadable) ? EV_READ : 0;
  flags |= (what & kWriteable) ? EV_WRITE : 0;
#if LIBEVENT_VERSION_NUMBER >= 0x02010400
  flags |= (what & kClosed) ? EV_CLOSED : 0;
#endif
  event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this);
  EventPtr<event> ioeventPtr{ioevent};
  fd_task_map_.insert(
      std::make_pair(fd, std::make_pair(std::move(ioeventPtr), task)));
  event_add(ioevent, nullptr);
}

void EventTaskRunner::RemoveIoCompletionTask(int fd) {
  fd_task_map_.erase(fd);
}

void EventTaskRunner::Run() {
  g_event_base = base_.get();

  struct sigaction sa = {};
  sa.sa_handler = [](int signal) {
    event_base_loopexit(g_event_base, nullptr);
  };
  sigfillset(&sa.sa_mask);
  sigaction(SIGINT, &sa, nullptr);

  do {
    event_base_loop(g_event_base, EVLOOP_ONCE);
  } while (!event_base_got_exit(g_event_base));
  g_event_base = nullptr;
}

void EventTaskRunner::ReScheduleEvent(base::TimeDelta delay) {
  timespec ts = delay.ToTimeSpec();
  timeval tv = {ts.tv_sec, ts.tv_nsec / 1000};
  event_add(task_event_.get(), &tv);
}

void EventTaskRunner::EventHandler(int /* fd */,
                                   int16_t /* what */,
                                   void* runner) {
  static_cast<EventTaskRunner*>(runner)->Process();
}

void EventTaskRunner::FreeEvent(event* evnt) {
  event_del(evnt);
  event_free(evnt);
}

void EventTaskRunner::Process() {
  while (!queue_.empty() && queue_.top().first.first <= base::Time::Now()) {
    auto cb = queue_.top().second;
    queue_.pop();
    cb.Run();
  }
  if (!queue_.empty()) {
    base::TimeDelta delta = std::max(
        base::TimeDelta(), queue_.top().first.first - base::Time::Now());
    ReScheduleEvent(delta);
  }
}

void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) {
  static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what);
}

void EventTaskRunner::ProcessFd(int fd, int16_t what) {
  auto it = fd_task_map_.find(fd);
  if (it != fd_task_map_.end()) {
    const IoCompletionCallback& callback = it->second.second;
    callback.Run(fd, what, this);
  }
}

}  // namespace examples
}  // namespace weave