/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define TRACE_TAG USB
#include "sysdeps.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <unistd.h>
#include <linux/usb/functionfs.h>
#include <sys/eventfd.h>
#include <algorithm>
#include <array>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <vector>
#include <asyncio/AsyncIO.h>
#include <android-base/logging.h>
#include <android-base/macros.h>
#include <android-base/properties.h>
#include <android-base/thread_annotations.h>
#include <adbd/usb.h>
#include "adb_unique_fd.h"
#include "adb_utils.h"
#include "sysdeps/chrono.h"
#include "transport.h"
#include "types.h"
using android::base::StringPrintf;
// We can't find out whether we have support for AIO on ffs endpoints until we submit a read.
static std::optional<bool> gFfsAioSupported;
// Not all USB controllers support operations larger than 16k, so don't go above that.
// Also, each submitted operation does an allocation in the kernel of that size, so we want to
// minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
static constexpr size_t kUsbReadQueueDepth = 8;
static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE;
static constexpr size_t kUsbWriteQueueDepth = 8;
static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE;
static const char* to_string(enum usb_functionfs_event_type type) {
switch (type) {
case FUNCTIONFS_BIND:
return "FUNCTIONFS_BIND";
case FUNCTIONFS_UNBIND:
return "FUNCTIONFS_UNBIND";
case FUNCTIONFS_ENABLE:
return "FUNCTIONFS_ENABLE";
case FUNCTIONFS_DISABLE:
return "FUNCTIONFS_DISABLE";
case FUNCTIONFS_SETUP:
return "FUNCTIONFS_SETUP";
case FUNCTIONFS_SUSPEND:
return "FUNCTIONFS_SUSPEND";
case FUNCTIONFS_RESUME:
return "FUNCTIONFS_RESUME";
}
}
enum class TransferDirection : uint64_t {
READ = 0,
WRITE = 1,
};
struct TransferId {
TransferDirection direction : 1;
uint64_t id : 63;
TransferId() : TransferId(TransferDirection::READ, 0) {}
private:
TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {}
public:
explicit operator uint64_t() const {
uint64_t result;
static_assert(sizeof(*this) == sizeof(result));
memcpy(&result, this, sizeof(*this));
return result;
}
static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); }
static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); }
static TransferId from_value(uint64_t value) {
TransferId result;
memcpy(&result, &value, sizeof(value));
return result;
}
};
struct IoBlock {
bool pending = false;
struct iocb control = {};
std::shared_ptr<Block> payload;
TransferId id() const { return TransferId::from_value(control.aio_data); }
};
struct ScopedAioContext {
ScopedAioContext() = default;
~ScopedAioContext() { reset(); }
ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
ScopedAioContext(const ScopedAioContext& copy) = delete;
ScopedAioContext& operator=(ScopedAioContext&& move) {
reset(move.release());
return *this;
}
ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
static ScopedAioContext Create(size_t max_events) {
aio_context_t ctx = 0;
if (io_setup(max_events, &ctx) != 0) {
PLOG(FATAL) << "failed to create aio_context_t";
}
ScopedAioContext result;
result.reset(ctx);
return result;
}
aio_context_t release() {
aio_context_t result = context_;
context_ = 0;
return result;
}
void reset(aio_context_t new_context = 0) {
if (context_ != 0) {
io_destroy(context_);
}
context_ = new_context;
}
aio_context_t get() { return context_; }
private:
aio_context_t context_ = 0;
};
struct UsbFfsConnection : public Connection {
UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
std::promise<void> destruction_notifier)
: worker_started_(false),
stopped_(false),
destruction_notifier_(std::move(destruction_notifier)),
control_fd_(std::move(control)),
read_fd_(std::move(read)),
write_fd_(std::move(write)) {
LOG(INFO) << "UsbFfsConnection constructed";
worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
if (worker_event_fd_ == -1) {
PLOG(FATAL) << "failed to create eventfd";
}
monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
if (monitor_event_fd_ == -1) {
PLOG(FATAL) << "failed to create eventfd";
}
aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
}
~UsbFfsConnection() {
LOG(INFO) << "UsbFfsConnection being destroyed";
Stop();
monitor_thread_.join();
// We need to explicitly close our file descriptors before we notify our destruction,
// because the thread listening on the future will immediately try to reopen the endpoint.
aio_context_.reset();
control_fd_.reset();
read_fd_.reset();
write_fd_.reset();
destruction_notifier_.set_value();
}
virtual bool Write(std::unique_ptr<apacket> packet) override final {
LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
Block header(sizeof(packet->msg));
memcpy(header.data(), &packet->msg, sizeof(packet->msg));
std::lock_guard<std::mutex> lock(write_mutex_);
write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++));
if (!packet->payload.empty()) {
// The kernel attempts to allocate a contiguous block of memory for each write,
// which can fail if the write is large and the kernel heap is fragmented.
// Split large writes into smaller chunks to avoid this.
std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload));
size_t offset = 0;
size_t len = payload->size();
while (len > 0) {
size_t write_size = std::min(kUsbWriteSize, len);
write_requests_.push_back(
CreateWriteBlock(payload, offset, write_size, next_write_id_++));
len -= write_size;
offset += write_size;
}
}
SubmitWrites();
return true;
}
virtual void Start() override final { StartMonitor(); }
virtual void Stop() override final {
if (stopped_.exchange(true)) {
return;
}
stopped_ = true;
uint64_t notify = 1;
ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify));
if (rc < 0) {
PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
}
CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
rc = adb_write(monitor_event_fd_.get(), ¬ify, sizeof(notify));
if (rc < 0) {
PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
}
CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
}
private:
void StartMonitor() {
// This is a bit of a mess.
// It's possible for io_submit to end up blocking, if we call it as the endpoint
// becomes disabled. Work around this by having a monitor thread to listen for functionfs
// lifecycle events. If we notice an error condition (either we've become disabled, or we
// were never enabled in the first place), we send interruption signals to the worker thread
// until it dies, and then report failure to the transport via HandleError, which will
// eventually result in the transport being destroyed, which will result in UsbFfsConnection
// being destroyed, which unblocks the open thread and restarts this entire process.
static std::once_flag handler_once;
std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
monitor_thread_ = std::thread([this]() {
adb_thread_setname("UsbFfs-monitor");
bool bound = false;
bool enabled = false;
bool running = true;
while (running) {
adb_pollfd pfd[2] = {
{ .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
{ .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
};
// If we don't see our first bind within a second, try again.
int timeout_ms = bound ? -1 : 1000;
int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
if (rc == -1) {
PLOG(FATAL) << "poll on USB control fd failed";
} else if (rc == 0) {
LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
break;
}
if (pfd[1].revents) {
// We were told to die.
break;
}
struct usb_functionfs_event event;
rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
if (rc == -1) {
PLOG(FATAL) << "failed to read functionfs event";
} else if (rc == 0) {
LOG(WARNING) << "hit EOF on functionfs control fd";
break;
} else if (rc != sizeof(event)) {
LOG(FATAL) << "read functionfs event of unexpected size, expected "
<< sizeof(event) << ", got " << rc;
}
LOG(INFO) << "USB event: "
<< to_string(static_cast<usb_functionfs_event_type>(event.type));
switch (event.type) {
case FUNCTIONFS_BIND:
if (bound) {
LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
running = false;
break;
}
if (enabled) {
LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
running = false;
break;
}
bound = true;
break;
case FUNCTIONFS_ENABLE:
if (!bound) {
LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
running = false;
break;
}
if (enabled) {
LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
running = false;
break;
}
enabled = true;
StartWorker();
break;
case FUNCTIONFS_DISABLE:
if (!bound) {
LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
}
if (!enabled) {
LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
}
enabled = false;
running = false;
break;
case FUNCTIONFS_UNBIND:
if (enabled) {
LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
}
if (!bound) {
LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
}
bound = false;
running = false;
break;
case FUNCTIONFS_SETUP: {
LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
<< static_cast<int>(event.u.setup.bRequestType)
<< ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
<< ", wValue = " << static_cast<int>(event.u.setup.wValue)
<< ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
<< ", wLength = " << static_cast<int>(event.u.setup.wLength);
if ((event.u.setup.bRequestType & USB_DIR_IN)) {
LOG(INFO) << "acking device-to-host control transfer";
ssize_t rc = adb_write(control_fd_.get(), "", 0);
if (rc != 0) {
PLOG(ERROR) << "failed to write empty packet to host";
break;
}
} else {
std::string buf;
buf.resize(event.u.setup.wLength + 1);
ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
if (rc != event.u.setup.wLength) {
LOG(ERROR)
<< "read " << rc
<< " bytes when trying to read control request, expected "
<< event.u.setup.wLength;
}
LOG(INFO) << "control request contents: " << buf;
break;
}
}
}
}
StopWorker();
HandleError("monitor thread finished");
});
}
void StartWorker() {
CHECK(!worker_started_);
worker_started_ = true;
worker_thread_ = std::thread([this]() {
adb_thread_setname("UsbFfs-worker");
for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
read_requests_[i] = CreateReadBlock(next_read_id_++);
if (!SubmitRead(&read_requests_[i])) {
return;
}
}
while (!stopped_) {
uint64_t dummy;
ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
if (rc == -1) {
PLOG(FATAL) << "failed to read from eventfd";
} else if (rc == 0) {
LOG(FATAL) << "hit EOF on eventfd";
}
ReadEvents();
}
});
}
void StopWorker() {
if (!worker_started_) {
return;
}
pthread_t worker_thread_handle = worker_thread_.native_handle();
while (true) {
int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
if (rc != 0) {
LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
break;
}
std::this_thread::sleep_for(100ms);
rc = pthread_kill(worker_thread_handle, 0);
if (rc == 0) {
continue;
} else if (rc == ESRCH) {
break;
} else {
LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
}
}
worker_thread_.join();
}
void PrepareReadBlock(IoBlock* block, uint64_t id) {
block->pending = false;
block->payload = std::make_shared<Block>(kUsbReadSize);
block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data());
block->control.aio_nbytes = block->payload->size();
}
IoBlock CreateReadBlock(uint64_t id) {
IoBlock block;
PrepareReadBlock(&block, id);
block.control.aio_rw_flags = 0;
block.control.aio_lio_opcode = IOCB_CMD_PREAD;
block.control.aio_reqprio = 0;
block.control.aio_fildes = read_fd_.get();
block.control.aio_offset = 0;
block.control.aio_flags = IOCB_FLAG_RESFD;
block.control.aio_resfd = worker_event_fd_.get();
return block;
}
void ReadEvents() {
static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
struct io_event events[kMaxEvents];
struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
if (rc == -1) {
HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
return;
}
for (int event_idx = 0; event_idx < rc; ++event_idx) {
auto& event = events[event_idx];
TransferId id = TransferId::from_value(event.data);
if (event.res < 0) {
std::string error =
StringPrintf("%s %" PRIu64 " failed with error %s",
id.direction == TransferDirection::READ ? "read" : "write",
id.id, strerror(-event.res));
HandleError(error);
return;
}
if (id.direction == TransferDirection::READ) {
HandleRead(id, event.res);
} else {
HandleWrite(id);
}
}
}
void HandleRead(TransferId id, int64_t size) {
uint64_t read_idx = id.id % kUsbReadQueueDepth;
IoBlock* block = &read_requests_[read_idx];
block->pending = false;
block->payload->resize(size);
// Notification for completed reads can be received out of order.
if (block->id().id != needed_read_id_) {
LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
<< needed_read_id_;
return;
}
for (uint64_t id = needed_read_id_;; ++id) {
size_t read_idx = id % kUsbReadQueueDepth;
IoBlock* current_block = &read_requests_[read_idx];
if (current_block->pending) {
break;
}
ProcessRead(current_block);
++needed_read_id_;
}
}
void ProcessRead(IoBlock* block) {
if (!block->payload->empty()) {
if (!incoming_header_.has_value()) {
CHECK_EQ(sizeof(amessage), block->payload->size());
amessage msg;
memcpy(&msg, block->payload->data(), sizeof(amessage));
LOG(DEBUG) << "USB read:" << dump_header(&msg);
incoming_header_ = msg;
} else {
size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
Block payload = std::move(*block->payload);
CHECK_LE(payload.size(), bytes_left);
incoming_payload_.append(std::make_unique<Block>(std::move(payload)));
}
if (incoming_header_->data_length == incoming_payload_.size()) {
auto packet = std::make_unique<apacket>();
packet->msg = *incoming_header_;
// TODO: Make apacket contain an IOVector so we don't have to coalesce.
packet->payload = incoming_payload_.coalesce();
read_callback_(this, std::move(packet));
incoming_header_.reset();
incoming_payload_.clear();
}
}
PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
SubmitRead(block);
}
bool SubmitRead(IoBlock* block) {
block->pending = true;
struct iocb* iocb = &block->control;
if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
if (errno == EINVAL && !gFfsAioSupported.has_value()) {
HandleError("failed to submit first read, AIO on FFS not supported");
gFfsAioSupported = false;
return false;
}
HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
return false;
}
gFfsAioSupported = true;
return true;
}
void HandleWrite(TransferId id) {
std::lock_guard<std::mutex> lock(write_mutex_);
auto it =
std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id);
});
CHECK(it != write_requests_.end());
write_requests_.erase(it);
size_t outstanding_writes = --writes_submitted_;
LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
SubmitWrites();
}
std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset,
size_t len, uint64_t id) {
auto block = std::make_unique<IoBlock>();
block->payload = std::move(payload);
block->control.aio_data = static_cast<uint64_t>(TransferId::write(id));
block->control.aio_rw_flags = 0;
block->control.aio_lio_opcode = IOCB_CMD_PWRITE;
block->control.aio_reqprio = 0;
block->control.aio_fildes = write_fd_.get();
block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset);
block->control.aio_nbytes = len;
block->control.aio_offset = 0;
block->control.aio_flags = IOCB_FLAG_RESFD;
block->control.aio_resfd = worker_event_fd_.get();
return block;
}
std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload));
size_t len = block->size();
return CreateWriteBlock(std::move(block), 0, len, id);
}
void SubmitWrites() REQUIRES(write_mutex_) {
if (writes_submitted_ == kUsbWriteQueueDepth) {
return;
}
ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
write_requests_.size() - writes_submitted_);
CHECK_GE(writes_to_submit, 0);
if (writes_to_submit == 0) {
return;
}
struct iocb* iocbs[kUsbWriteQueueDepth];
for (int i = 0; i < writes_to_submit; ++i) {
CHECK(!write_requests_[writes_submitted_ + i]->pending);
write_requests_[writes_submitted_ + i]->pending = true;
iocbs[i] = &write_requests_[writes_submitted_ + i]->control;
LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
}
writes_submitted_ += writes_to_submit;
int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
if (rc == -1) {
HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
return;
} else if (rc != writes_to_submit) {
LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
<< ", actually submitted " << rc;
}
}
void HandleError(const std::string& error) {
std::call_once(error_flag_, [&]() {
error_callback_(this, error);
if (!stopped_) {
Stop();
}
});
}
std::thread monitor_thread_;
bool worker_started_;
std::thread worker_thread_;
std::atomic<bool> stopped_;
std::promise<void> destruction_notifier_;
std::once_flag error_flag_;
unique_fd worker_event_fd_;
unique_fd monitor_event_fd_;
ScopedAioContext aio_context_;
unique_fd control_fd_;
unique_fd read_fd_;
unique_fd write_fd_;
std::optional<amessage> incoming_header_;
IOVector incoming_payload_;
std::array<IoBlock, kUsbReadQueueDepth> read_requests_;
IOVector read_data_;
// ID of the next request that we're going to send out.
size_t next_read_id_ = 0;
// ID of the next packet we're waiting for.
size_t needed_read_id_ = 0;
std::mutex write_mutex_;
std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_);
size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
static constexpr int kInterruptionSignal = SIGUSR1;
};
void usb_init_legacy();
static void usb_ffs_open_thread() {
adb_thread_setname("usb ffs open");
while (true) {
if (gFfsAioSupported.has_value() && !gFfsAioSupported.value()) {
LOG(INFO) << "failed to use nonblocking ffs, falling back to legacy";
return usb_init_legacy();
}
unique_fd control;
unique_fd bulk_out;
unique_fd bulk_in;
if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
std::this_thread::sleep_for(1s);
continue;
}
atransport* transport = new atransport();
transport->serial = "UsbFfs";
std::promise<void> destruction_notifier;
std::future<void> future = destruction_notifier.get_future();
transport->SetConnection(std::make_unique<UsbFfsConnection>(
std::move(control), std::move(bulk_out), std::move(bulk_in),
std::move(destruction_notifier)));
register_transport(transport);
future.wait();
}
}
void usb_init() {
bool use_nonblocking = android::base::GetBoolProperty(
"persist.adb.nonblocking_ffs",
android::base::GetBoolProperty("ro.adb.nonblocking_ffs", true));
if (use_nonblocking) {
std::thread(usb_ffs_open_thread).detach();
} else {
usb_init_legacy();
}
}