#include <pdx/service_dispatcher.h>

#include <errno.h>
#include <log/log.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>

#include <pdx/service.h>
#include <pdx/service_endpoint.h>

static const int kMaxEventsPerLoop = 128;

namespace android {
namespace pdx {

std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() {
  std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
  if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
    dispatcher.reset();
  }

  return dispatcher;
}

ServiceDispatcher::ServiceDispatcher() {
  event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
  if (!event_fd_) {
    ALOGE("Failed to create event fd because: %s\n", strerror(errno));
    return;
  }

  epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
  if (!epoll_fd_) {
    ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
    return;
  }

  // Use "this" as a unique pointer to distinguish the event fd from all
  // the other entries that point to instances of Service.
  epoll_event event;
  event.events = EPOLLIN;
  event.data.ptr = this;

  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
    ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));

    // Close the fds here and signal failure to the factory method.
    event_fd_.Close();
    epoll_fd_.Close();
  }
}

ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }

int ServiceDispatcher::ThreadEnter() {
  std::lock_guard<std::mutex> autolock(mutex_);

  if (canceled_)
    return -EBUSY;

  thread_count_++;
  return 0;
}

void ServiceDispatcher::ThreadExit() {
  std::lock_guard<std::mutex> autolock(mutex_);
  thread_count_--;
  condition_.notify_one();
}

int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
  std::lock_guard<std::mutex> autolock(mutex_);

  epoll_event event;
  event.events = EPOLLIN;
  event.data.ptr = service.get();

  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(),
                &event) < 0) {
    ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
    return -errno;
  }

  services_.push_back(service);
  return 0;
}

int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
  std::lock_guard<std::mutex> autolock(mutex_);

  // It's dangerous to remove a service while other threads may be using it.
  if (thread_count_ > 0)
    return -EBUSY;

  epoll_event dummy;  // See BUGS in man 2 epoll_ctl.
  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(),
                &dummy) < 0) {
    ALOGE("Failed to remove service from dispatcher because: %s\n",
          strerror(errno));
    return -errno;
  }

  services_.erase(std::remove(services_.begin(), services_.end(), service),
                  services_.end());
  return 0;
}

int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }

int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
  int ret = ThreadEnter();
  if (ret < 0)
    return ret;

  epoll_event events[kMaxEventsPerLoop];

  int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
  if (count <= 0) {
    ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
             strerror(errno));
    ThreadExit();
    return count < 0 ? -errno : -ETIMEDOUT;
  }

  for (int i = 0; i < count; i++) {
    if (events[i].data.ptr == this) {
      ThreadExit();
      return -EBUSY;
    } else {
      Service* service = static_cast<Service*>(events[i].data.ptr);

      ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
               service->endpoint()->epoll_fd());
      service->ReceiveAndDispatch();
    }
  }

  ThreadExit();
  return 0;
}

int ServiceDispatcher::EnterDispatchLoop() {
  int ret = ThreadEnter();
  if (ret < 0)
    return ret;

  epoll_event events[kMaxEventsPerLoop];

  while (!IsCanceled()) {
    int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
    if (count < 0 && errno != EINTR) {
      ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
      ThreadExit();
      return -errno;
    }

    for (int i = 0; i < count; i++) {
      if (events[i].data.ptr == this) {
        ThreadExit();
        return -EBUSY;
      } else {
        Service* service = static_cast<Service*>(events[i].data.ptr);

        ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
                 service->endpoint()->epoll_fd());
        service->ReceiveAndDispatch();
      }
    }
  }

  ThreadExit();
  return 0;
}

void ServiceDispatcher::SetCanceled(bool cancel) {
  std::unique_lock<std::mutex> lock(mutex_);
  canceled_ = cancel;

  if (canceled_ && thread_count_ > 0) {
    eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit.

    condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });

    eventfd_t value;
    eventfd_read(event_fd_.Get(), &value);  // Unsignal.
  }
}

bool ServiceDispatcher::IsCanceled() const { return canceled_; }

}  // namespace pdx
}  // namespace android