#include "uds/service_dispatcher.h"
#include <errno.h>
#include <log/log.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "pdx/service.h"
#include "uds/service_endpoint.h"
static const int kMaxEventsPerLoop = 128;
namespace android {
namespace pdx {
namespace uds {
std::unique_ptr<pdx::ServiceDispatcher> ServiceDispatcher::Create() {
std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
dispatcher.reset();
}
return std::move(dispatcher);
}
ServiceDispatcher::ServiceDispatcher() {
event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
if (!event_fd_) {
ALOGE("Failed to create event fd because: %s\n", strerror(errno));
return;
}
epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
if (!epoll_fd_) {
ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
return;
}
// Use "this" as a unique pointer to distinguish the event fd from all
// the other entries that point to instances of Service.
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = this;
if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
// Close the fds here and signal failure to the factory method.
event_fd_.Close();
epoll_fd_.Close();
}
}
ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
int ServiceDispatcher::ThreadEnter() {
std::lock_guard<std::mutex> autolock(mutex_);
if (canceled_)
return -EBUSY;
thread_count_++;
return 0;
}
void ServiceDispatcher::ThreadExit() {
std::lock_guard<std::mutex> autolock(mutex_);
thread_count_--;
condition_.notify_one();
}
int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
return -EINVAL;
std::lock_guard<std::mutex> autolock(mutex_);
auto* endpoint = static_cast<Endpoint*>(service->endpoint());
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = service.get();
if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, endpoint->epoll_fd(), &event) <
0) {
ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
return -errno;
}
services_.push_back(service);
return 0;
}
int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
return -EINVAL;
std::lock_guard<std::mutex> autolock(mutex_);
// It's dangerous to remove a service while other threads may be using it.
if (thread_count_ > 0)
return -EBUSY;
epoll_event dummy; // See BUGS in man 2 epoll_ctl.
auto* endpoint = static_cast<Endpoint*>(service->endpoint());
if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, endpoint->epoll_fd(), &dummy) <
0) {
ALOGE("Failed to remove service from dispatcher because: %s\n",
strerror(errno));
return -errno;
}
services_.remove(service);
return 0;
}
int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
int ret = ThreadEnter();
if (ret < 0)
return ret;
epoll_event events[kMaxEventsPerLoop];
int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
if (count <= 0) {
ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
strerror(errno));
ThreadExit();
return count < 0 ? -errno : -ETIMEDOUT;
}
for (int i = 0; i < count; i++) {
if (events[i].data.ptr == this) {
ThreadExit();
return -EBUSY;
} else {
Service* service = static_cast<Service*>(events[i].data.ptr);
ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
static_cast<Endpoint*>(service->endpoint())->epoll_fd());
service->ReceiveAndDispatch();
}
}
ThreadExit();
return 0;
}
int ServiceDispatcher::EnterDispatchLoop() {
int ret = ThreadEnter();
if (ret < 0)
return ret;
epoll_event events[kMaxEventsPerLoop];
while (!IsCanceled()) {
int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
if (count < 0 && errno != EINTR) {
ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
ThreadExit();
return -errno;
}
for (int i = 0; i < count; i++) {
if (events[i].data.ptr == this) {
ThreadExit();
return -EBUSY;
} else {
Service* service = static_cast<Service*>(events[i].data.ptr);
ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
static_cast<Endpoint*>(service->endpoint())->epoll_fd());
service->ReceiveAndDispatch();
}
}
}
ThreadExit();
return 0;
}
void ServiceDispatcher::SetCanceled(bool cancel) {
std::unique_lock<std::mutex> lock(mutex_);
canceled_ = cancel;
if (canceled_ && thread_count_ > 0) {
eventfd_write(event_fd_.Get(), 1); // Signal threads to quit.
condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
eventfd_t value;
eventfd_read(event_fd_.Get(), &value); // Unsignal.
}
}
bool ServiceDispatcher::IsCanceled() const { return canceled_; }
} // namespace uds
} // namespace pdx
} // namespace android