/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdint.h>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <android-base/logging.h>
#include <android-base/stringprintf.h>
#include <android-base/thread_annotations.h>
#include "adb_unique_fd.h"
#include "adb_utils.h"
#include "sysdeps.h"
#include "transport.h"
#include "types.h"
static void CreateWakeFds(unique_fd* read, unique_fd* write) {
// TODO: eventfd on linux?
int wake_fds[2];
int rc = adb_socketpair(wake_fds);
set_file_block_mode(wake_fds[0], false);
set_file_block_mode(wake_fds[1], false);
CHECK_EQ(0, rc);
*read = unique_fd(wake_fds[0]);
*write = unique_fd(wake_fds[1]);
}
struct NonblockingFdConnection : public Connection {
NonblockingFdConnection(unique_fd fd) : started_(false), fd_(std::move(fd)) {
set_file_block_mode(fd_.get(), false);
CreateWakeFds(&wake_fd_read_, &wake_fd_write_);
}
void SetRunning(bool value) {
std::lock_guard<std::mutex> lock(run_mutex_);
running_ = value;
}
bool IsRunning() {
std::lock_guard<std::mutex> lock(run_mutex_);
return running_;
}
void Run(std::string* error) {
SetRunning(true);
while (IsRunning()) {
adb_pollfd pfds[2] = {
{.fd = fd_.get(), .events = POLLIN},
{.fd = wake_fd_read_.get(), .events = POLLIN},
};
{
std::lock_guard<std::mutex> lock(this->write_mutex_);
if (!writable_) {
pfds[0].events |= POLLOUT;
}
}
int rc = adb_poll(pfds, 2, -1);
if (rc == -1) {
*error = android::base::StringPrintf("poll failed: %s", strerror(errno));
return;
} else if (rc == 0) {
LOG(FATAL) << "poll timed out with an infinite timeout?";
}
if (pfds[0].revents) {
if ((pfds[0].revents & POLLOUT)) {
std::lock_guard<std::mutex> lock(this->write_mutex_);
if (DispatchWrites() == WriteResult::Error) {
*error = "write failed";
return;
}
}
if (pfds[0].revents & POLLIN) {
// TODO: Should we be getting blocks from a free list?
auto block = std::make_unique<IOVector::block_type>(MAX_PAYLOAD);
rc = adb_read(fd_.get(), &(*block)[0], block->size());
if (rc == -1) {
*error = std::string("read failed: ") + strerror(errno);
return;
} else if (rc == 0) {
*error = "read failed: EOF";
return;
}
block->resize(rc);
read_buffer_.append(std::move(block));
if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) {
auto header_buf = read_buffer_.take_front(sizeof(amessage)).coalesce();
CHECK_EQ(sizeof(amessage), header_buf.size());
read_header_ = std::make_unique<amessage>();
memcpy(read_header_.get(), header_buf.data(), sizeof(amessage));
}
if (read_header_ && read_buffer_.size() >= read_header_->data_length) {
auto data_chain = read_buffer_.take_front(read_header_->data_length);
// TODO: Make apacket carry around a IOVector instead of coalescing.
auto payload = data_chain.coalesce<apacket::payload_type>();
auto packet = std::make_unique<apacket>();
packet->msg = *read_header_;
packet->payload = std::move(payload);
read_header_ = nullptr;
read_callback_(this, std::move(packet));
}
}
}
if (pfds[1].revents) {
uint64_t buf;
rc = adb_read(wake_fd_read_.get(), &buf, sizeof(buf));
CHECK_EQ(static_cast<int>(sizeof(buf)), rc);
// We were woken up either to add POLLOUT to our events, or to exit.
// Do nothing.
}
}
}
void Start() override final {
if (started_.exchange(true)) {
LOG(FATAL) << "Connection started multiple times?";
}
thread_ = std::thread([this]() {
std::string error = "connection closed";
Run(&error);
this->error_callback_(this, error);
});
}
void Stop() override final {
SetRunning(false);
WakeThread();
thread_.join();
}
void WakeThread() {
uint64_t buf = 0;
if (TEMP_FAILURE_RETRY(adb_write(wake_fd_write_.get(), &buf, sizeof(buf))) != sizeof(buf)) {
LOG(FATAL) << "failed to wake up thread";
}
}
enum class WriteResult {
Error,
Completed,
TryAgain,
};
WriteResult DispatchWrites() REQUIRES(write_mutex_) {
CHECK(!write_buffer_.empty());
auto iovs = write_buffer_.iovecs();
ssize_t rc = adb_writev(fd_.get(), iovs.data(), iovs.size());
if (rc == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
writable_ = false;
return WriteResult::TryAgain;
}
return WriteResult::Error;
} else if (rc == 0) {
errno = 0;
return WriteResult::Error;
}
// TODO: Implement a more efficient drop_front?
write_buffer_.take_front(rc);
writable_ = write_buffer_.empty();
if (write_buffer_.empty()) {
return WriteResult::Completed;
}
// There's data left in the range, which means our write returned early.
return WriteResult::TryAgain;
}
bool Write(std::unique_ptr<apacket> packet) final {
std::lock_guard<std::mutex> lock(write_mutex_);
const char* header_begin = reinterpret_cast<const char*>(&packet->msg);
const char* header_end = header_begin + sizeof(packet->msg);
auto header_block = std::make_unique<IOVector::block_type>(header_begin, header_end);
write_buffer_.append(std::move(header_block));
if (!packet->payload.empty()) {
write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload)));
}
WriteResult result = DispatchWrites();
if (result == WriteResult::TryAgain) {
WakeThread();
}
return result != WriteResult::Error;
}
std::thread thread_;
std::atomic<bool> started_;
std::mutex run_mutex_;
bool running_ GUARDED_BY(run_mutex_);
std::unique_ptr<amessage> read_header_;
IOVector read_buffer_;
unique_fd fd_;
unique_fd wake_fd_read_;
unique_fd wake_fd_write_;
std::mutex write_mutex_;
bool writable_ GUARDED_BY(write_mutex_) = true;
IOVector write_buffer_ GUARDED_BY(write_mutex_);
IOVector incoming_queue_;
};
std::unique_ptr<Connection> Connection::FromFd(unique_fd fd) {
return std::make_unique<NonblockingFdConnection>(std::move(fd));
}