/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "perfetto/public/consumer_api.h"
#include <fcntl.h>
#include <inttypes.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <unistd.h>
#include <atomic>
#include <memory>
#include <mutex>
#include <thread>
#include "perfetto/base/build_config.h"
#include "perfetto/base/event.h"
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/temp_file.h"
#include "perfetto/base/thread_checker.h"
#include "perfetto/base/unix_task_runner.h"
#include "perfetto/base/utils.h"
#include "perfetto/tracing/core/consumer.h"
#include "perfetto/tracing/core/trace_config.h"
#include "perfetto/tracing/core/trace_packet.h"
#include "perfetto/tracing/ipc/consumer_ipc_client.h"
#include "src/tracing/ipc/default_socket.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
#include <linux/memfd.h>
#include <sys/syscall.h>
#endif
#include "perfetto/config/trace_config.pb.h"
#define PERFETTO_EXPORTED_API __attribute__((visibility("default")))
namespace perfetto {
namespace consumer {
namespace {
class TracingSession : public Consumer {
public:
TracingSession(base::TaskRunner*,
Handle,
OnStateChangedCb,
void* callback_arg,
const perfetto::protos::TraceConfig&);
~TracingSession() override;
// Note: if making this class moveable, the move-ctor/dtor must be updated
// to clear up mapped_buf_ on dtor.
// These methods are called on a thread != |task_runner_|.
State state() const { return state_; }
std::pair<char*, size_t> mapped_buf() const {
// The comparison operator will do an acquire-load on the atomic |state_|.
if (state_ == State::kTraceEnded)
return std::make_pair(mapped_buf_, mapped_buf_size_);
return std::make_pair(nullptr, 0);
}
// All the methods below are called only on the |task_runner_| thread.
bool Initialize();
void StartTracing();
// perfetto::Consumer implementation.
void OnConnect() override;
void OnDisconnect() override;
void OnTracingDisabled() override;
void OnTraceData(std::vector<TracePacket>, bool has_more) override;
void OnDetach(bool) override;
void OnAttach(bool, const TraceConfig&) override;
void OnTraceStats(bool, const TraceStats&) override;
void OnObservableEvents(const ObservableEvents&) override;
private:
TracingSession(const TracingSession&) = delete;
TracingSession& operator=(const TracingSession&) = delete;
void DestroyConnection();
void NotifyCallback();
base::TaskRunner* const task_runner_;
Handle const handle_;
OnStateChangedCb const callback_ = nullptr;
void* const callback_arg_ = nullptr;
TraceConfig trace_config_;
base::ScopedFile buf_fd_;
std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
// |mapped_buf_| and |mapped_buf_size_| are seq-consistent with |state_|.
std::atomic<State> state_{State::kIdle};
char* mapped_buf_ = nullptr;
size_t mapped_buf_size_ = 0;
PERFETTO_THREAD_CHECKER(thread_checker_)
};
TracingSession::TracingSession(
base::TaskRunner* task_runner,
Handle handle,
OnStateChangedCb callback,
void* callback_arg,
const perfetto::protos::TraceConfig& trace_config_proto)
: task_runner_(task_runner),
handle_(handle),
callback_(callback),
callback_arg_(callback_arg) {
PERFETTO_DETACH_FROM_THREAD(thread_checker_);
trace_config_.FromProto(trace_config_proto);
trace_config_.set_write_into_file(true);
// TODO(primiano): this really doesn't matter because the trace will be
// flushed into the file when stopping. We need a way to be able to say
// "disable periodic flushing and flush only when stopping".
trace_config_.set_file_write_period_ms(60000);
}
TracingSession::~TracingSession() {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (mapped_buf_)
PERFETTO_CHECK(munmap(mapped_buf_, mapped_buf_size_) == 0);
}
bool TracingSession::Initialize() {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (state_ != State::kIdle)
return false;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
char memfd_name[64];
snprintf(memfd_name, sizeof(memfd_name), "perfetto_trace_%" PRId64, handle_);
buf_fd_.reset(
static_cast<int>(syscall(__NR_memfd_create, memfd_name, MFD_CLOEXEC)));
#else
// Fallback for testing on Linux/mac.
buf_fd_ = base::TempFile::CreateUnlinked().ReleaseFD();
#endif
if (!buf_fd_) {
PERFETTO_PLOG("Failed to allocate temporary tracing buffer");
return false;
}
state_ = State::kConnecting;
consumer_endpoint_ =
ConsumerIPCClient::Connect(GetConsumerSocket(), this, task_runner_);
return true;
}
// Called after EnabledTracing, soon after the IPC connection is established.
void TracingSession::OnConnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DLOG("OnConnect");
PERFETTO_DCHECK(state_ == State::kConnecting);
consumer_endpoint_->EnableTracing(trace_config_,
base::ScopedFile(dup(*buf_fd_)));
if (trace_config_.deferred_start())
state_ = State::kConfigured;
else
state_ = State::kTracing;
NotifyCallback();
}
void TracingSession::StartTracing() {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto state = state_.load();
if (state != State::kConfigured) {
PERFETTO_ELOG("StartTracing(): invalid state (%d)",
static_cast<int>(state));
return;
}
state_ = State::kTracing;
consumer_endpoint_->StartTracing();
}
void TracingSession::OnTracingDisabled() {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DLOG("OnTracingDisabled");
struct stat stat_buf {};
int res = fstat(buf_fd_.get(), &stat_buf);
mapped_buf_size_ = res == 0 ? static_cast<size_t>(stat_buf.st_size) : 0;
mapped_buf_ =
static_cast<char*>(mmap(nullptr, mapped_buf_size_, PROT_READ | PROT_WRITE,
MAP_SHARED, buf_fd_.get(), 0));
DestroyConnection();
if (mapped_buf_size_ == 0 || mapped_buf_ == MAP_FAILED) {
mapped_buf_ = nullptr;
mapped_buf_size_ = 0;
state_ = State::kTraceFailed;
PERFETTO_ELOG("Tracing session failed");
} else {
state_ = State::kTraceEnded;
}
NotifyCallback();
}
void TracingSession::OnDisconnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DLOG("OnDisconnect");
DestroyConnection();
state_ = State::kConnectionError;
NotifyCallback();
}
void TracingSession::OnDetach(bool) {
PERFETTO_DCHECK(false); // Should never be called, Detach() is not used here.
}
void TracingSession::OnAttach(bool, const TraceConfig&) {
PERFETTO_DCHECK(false); // Should never be called, Attach() is not used here.
}
void TracingSession::OnTraceStats(bool, const TraceStats&) {
// Should never be called, GetTraceStats() is not used here.
PERFETTO_DCHECK(false);
}
void TracingSession::OnObservableEvents(const ObservableEvents&) {
// Should never be called, ObserveEvents() is not used here.
PERFETTO_DCHECK(false);
}
void TracingSession::DestroyConnection() {
// Destroys the connection in a separate task. This is to avoid destroying
// the IPC connection directly from within the IPC callback.
TracingService::ConsumerEndpoint* endpoint = consumer_endpoint_.release();
task_runner_->PostTask([endpoint] { delete endpoint; });
}
void TracingSession::OnTraceData(std::vector<TracePacket>, bool) {
// This should be never called because we are using |write_into_file| and
// asking the traced service to directly write into the |buf_fd_|.
PERFETTO_DFATAL("Should be unreachable.");
}
void TracingSession::NotifyCallback() {
if (!callback_)
return;
auto state = state_.load();
auto callback = callback_;
auto handle = handle_;
auto callback_arg = callback_arg_;
task_runner_->PostTask([callback, callback_arg, handle, state] {
callback(handle, state, callback_arg);
});
}
class TracingController {
public:
static TracingController* GetInstance();
TracingController();
// These methods are called from a thread != |task_runner_|.
Handle Create(const void*, size_t, OnStateChangedCb, void* callback_arg);
void StartTracing(Handle);
State PollState(Handle);
TraceBuffer ReadTrace(Handle);
void Destroy(Handle);
private:
void ThreadMain(); // Called on |task_runner_| thread.
std::mutex mutex_;
std::thread thread_;
std::unique_ptr<base::UnixTaskRunner> task_runner_;
std::condition_variable task_runner_initialized_;
Handle last_handle_ = 0;
std::map<Handle, std::unique_ptr<TracingSession>> sessions_;
};
TracingController* TracingController::GetInstance() {
static TracingController* instance = new TracingController();
return instance;
}
TracingController::TracingController()
: thread_(&TracingController::ThreadMain, this) {
std::unique_lock<std::mutex> lock(mutex_);
task_runner_initialized_.wait(lock, [this] { return !!task_runner_; });
}
void TracingController::ThreadMain() {
{
std::unique_lock<std::mutex> lock(mutex_);
task_runner_.reset(new base::UnixTaskRunner());
}
task_runner_initialized_.notify_one();
task_runner_->Run();
}
Handle TracingController::Create(const void* config_proto_buf,
size_t config_len,
OnStateChangedCb callback,
void* callback_arg) {
perfetto::protos::TraceConfig config_proto;
bool parsed = config_proto.ParseFromArray(config_proto_buf,
static_cast<int>(config_len));
if (!parsed) {
PERFETTO_ELOG("Failed to decode TraceConfig proto");
return kInvalidHandle;
}
if (!config_proto.duration_ms()) {
PERFETTO_ELOG("The trace config must specify a duration");
return kInvalidHandle;
}
std::unique_lock<std::mutex> lock(mutex_);
Handle handle = ++last_handle_;
auto* session = new TracingSession(task_runner_.get(), handle, callback,
callback_arg, config_proto);
sessions_.emplace(handle, std::unique_ptr<TracingSession>(session));
// Enable the TracingSession on its own thread.
task_runner_->PostTask([session] { session->Initialize(); });
return handle;
}
void TracingController::StartTracing(Handle handle) {
std::unique_lock<std::mutex> lock(mutex_);
auto it = sessions_.find(handle);
if (it == sessions_.end()) {
PERFETTO_ELOG("StartTracing(): Invalid tracing session handle");
return;
};
TracingSession* session = it->second.get();
task_runner_->PostTask([session] { session->StartTracing(); });
}
State TracingController::PollState(Handle handle) {
std::unique_lock<std::mutex> lock(mutex_);
auto it = sessions_.find(handle);
if (it == sessions_.end())
return State::kSessionNotFound;
return it->second->state();
}
TraceBuffer TracingController::ReadTrace(Handle handle) {
TraceBuffer buf{};
std::unique_lock<std::mutex> lock(mutex_);
auto it = sessions_.find(handle);
if (it == sessions_.end()) {
PERFETTO_DLOG("Handle invalid");
return buf;
}
TracingSession* session = it->second.get();
auto state = session->state();
if (state == State::kTraceEnded) {
std::tie(buf.begin, buf.size) = session->mapped_buf();
return buf;
}
PERFETTO_DLOG("ReadTrace(): called in an unexpected state (%d)",
static_cast<int>(state));
return buf;
}
void TracingController::Destroy(Handle handle) {
// Post an empty task on the task runner to delete the session on its own
// thread.
std::unique_lock<std::mutex> lock(mutex_);
auto it = sessions_.find(handle);
if (it == sessions_.end())
return;
TracingSession* session = it->second.release();
sessions_.erase(it);
task_runner_->PostTask([session] { delete session; });
}
} // namespace
PERFETTO_EXPORTED_API Handle Create(const void* config_proto,
size_t config_len,
OnStateChangedCb callback,
void* callback_arg) {
return TracingController::GetInstance()->Create(config_proto, config_len,
callback, callback_arg);
}
PERFETTO_EXPORTED_API
void StartTracing(Handle handle) {
return TracingController::GetInstance()->StartTracing(handle);
}
PERFETTO_EXPORTED_API State PollState(Handle handle) {
return TracingController::GetInstance()->PollState(handle);
}
PERFETTO_EXPORTED_API TraceBuffer ReadTrace(Handle handle) {
return TracingController::GetInstance()->ReadTrace(handle);
}
PERFETTO_EXPORTED_API void Destroy(Handle handle) {
TracingController::GetInstance()->Destroy(handle);
}
} // namespace consumer
} // namespace perfetto