普通文本  |  428行  |  13.01 KB

/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "perfetto/public/consumer_api.h"

#include <fcntl.h>
#include <inttypes.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <unistd.h>

#include <atomic>
#include <memory>
#include <mutex>
#include <thread>

#include "perfetto/base/build_config.h"
#include "perfetto/base/event.h"
#include "perfetto/base/scoped_file.h"
#include "perfetto/base/temp_file.h"
#include "perfetto/base/thread_checker.h"
#include "perfetto/base/unix_task_runner.h"
#include "perfetto/base/utils.h"
#include "perfetto/tracing/core/consumer.h"
#include "perfetto/tracing/core/trace_config.h"
#include "perfetto/tracing/core/trace_packet.h"
#include "perfetto/tracing/ipc/consumer_ipc_client.h"
#include "src/tracing/ipc/default_socket.h"

#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
#include <linux/memfd.h>
#include <sys/syscall.h>
#endif

#include "perfetto/config/trace_config.pb.h"

#define PERFETTO_EXPORTED_API __attribute__((visibility("default")))

namespace perfetto {
namespace consumer {

namespace {

class TracingSession : public Consumer {
 public:
  TracingSession(base::TaskRunner*,
                 Handle,
                 OnStateChangedCb,
                 void* callback_arg,
                 const perfetto::protos::TraceConfig&);
  ~TracingSession() override;

  // Note: if making this class moveable, the move-ctor/dtor must be updated
  // to clear up mapped_buf_ on dtor.

  // These methods are called on a thread != |task_runner_|.
  State state() const { return state_; }
  std::pair<char*, size_t> mapped_buf() const {
    // The comparison operator will do an acquire-load on the atomic |state_|.
    if (state_ == State::kTraceEnded)
      return std::make_pair(mapped_buf_, mapped_buf_size_);
    return std::make_pair(nullptr, 0);
  }

  // All the methods below are called only on the |task_runner_| thread.

  bool Initialize();
  void StartTracing();

  // perfetto::Consumer implementation.
  void OnConnect() override;
  void OnDisconnect() override;
  void OnTracingDisabled() override;
  void OnTraceData(std::vector<TracePacket>, bool has_more) override;
  void OnDetach(bool) override;
  void OnAttach(bool, const TraceConfig&) override;
  void OnTraceStats(bool, const TraceStats&) override;
  void OnObservableEvents(const ObservableEvents&) override;

 private:
  TracingSession(const TracingSession&) = delete;
  TracingSession& operator=(const TracingSession&) = delete;

  void DestroyConnection();
  void NotifyCallback();

  base::TaskRunner* const task_runner_;
  Handle const handle_;
  OnStateChangedCb const callback_ = nullptr;
  void* const callback_arg_ = nullptr;
  TraceConfig trace_config_;
  base::ScopedFile buf_fd_;
  std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;

  // |mapped_buf_| and |mapped_buf_size_| are seq-consistent with |state_|.
  std::atomic<State> state_{State::kIdle};
  char* mapped_buf_ = nullptr;
  size_t mapped_buf_size_ = 0;

  PERFETTO_THREAD_CHECKER(thread_checker_)
};

TracingSession::TracingSession(
    base::TaskRunner* task_runner,
    Handle handle,
    OnStateChangedCb callback,
    void* callback_arg,
    const perfetto::protos::TraceConfig& trace_config_proto)
    : task_runner_(task_runner),
      handle_(handle),
      callback_(callback),
      callback_arg_(callback_arg) {
  PERFETTO_DETACH_FROM_THREAD(thread_checker_);
  trace_config_.FromProto(trace_config_proto);
  trace_config_.set_write_into_file(true);

  // TODO(primiano): this really doesn't matter because the trace will be
  // flushed into the file when stopping. We need a way to be able to say
  // "disable periodic flushing and flush only when stopping".
  trace_config_.set_file_write_period_ms(60000);
}

TracingSession::~TracingSession() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  if (mapped_buf_)
    PERFETTO_CHECK(munmap(mapped_buf_, mapped_buf_size_) == 0);
}

bool TracingSession::Initialize() {
  PERFETTO_DCHECK_THREAD(thread_checker_);

  if (state_ != State::kIdle)
    return false;

#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
  char memfd_name[64];
  snprintf(memfd_name, sizeof(memfd_name), "perfetto_trace_%" PRId64, handle_);
  buf_fd_.reset(
      static_cast<int>(syscall(__NR_memfd_create, memfd_name, MFD_CLOEXEC)));
#else
  // Fallback for testing on Linux/mac.
  buf_fd_ = base::TempFile::CreateUnlinked().ReleaseFD();
#endif

  if (!buf_fd_) {
    PERFETTO_PLOG("Failed to allocate temporary tracing buffer");
    return false;
  }

  state_ = State::kConnecting;
  consumer_endpoint_ =
      ConsumerIPCClient::Connect(GetConsumerSocket(), this, task_runner_);

  return true;
}

// Called after EnabledTracing, soon after the IPC connection is established.
void TracingSession::OnConnect() {
  PERFETTO_DCHECK_THREAD(thread_checker_);

  PERFETTO_DLOG("OnConnect");
  PERFETTO_DCHECK(state_ == State::kConnecting);
  consumer_endpoint_->EnableTracing(trace_config_,
                                    base::ScopedFile(dup(*buf_fd_)));
  if (trace_config_.deferred_start())
    state_ = State::kConfigured;
  else
    state_ = State::kTracing;
  NotifyCallback();
}

void TracingSession::StartTracing() {
  PERFETTO_DCHECK_THREAD(thread_checker_);

  auto state = state_.load();
  if (state != State::kConfigured) {
    PERFETTO_ELOG("StartTracing(): invalid state (%d)",
                  static_cast<int>(state));
    return;
  }
  state_ = State::kTracing;
  consumer_endpoint_->StartTracing();
}

void TracingSession::OnTracingDisabled() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  PERFETTO_DLOG("OnTracingDisabled");

  struct stat stat_buf {};
  int res = fstat(buf_fd_.get(), &stat_buf);
  mapped_buf_size_ = res == 0 ? static_cast<size_t>(stat_buf.st_size) : 0;
  mapped_buf_ =
      static_cast<char*>(mmap(nullptr, mapped_buf_size_, PROT_READ | PROT_WRITE,
                              MAP_SHARED, buf_fd_.get(), 0));
  DestroyConnection();
  if (mapped_buf_size_ == 0 || mapped_buf_ == MAP_FAILED) {
    mapped_buf_ = nullptr;
    mapped_buf_size_ = 0;
    state_ = State::kTraceFailed;
    PERFETTO_ELOG("Tracing session failed");
  } else {
    state_ = State::kTraceEnded;
  }
  NotifyCallback();
}

void TracingSession::OnDisconnect() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  PERFETTO_DLOG("OnDisconnect");
  DestroyConnection();
  state_ = State::kConnectionError;
  NotifyCallback();
}

void TracingSession::OnDetach(bool) {
  PERFETTO_DCHECK(false);  // Should never be called, Detach() is not used here.
}

void TracingSession::OnAttach(bool, const TraceConfig&) {
  PERFETTO_DCHECK(false);  // Should never be called, Attach() is not used here.
}

void TracingSession::OnTraceStats(bool, const TraceStats&) {
  // Should never be called, GetTraceStats() is not used here.
  PERFETTO_DCHECK(false);
}

void TracingSession::OnObservableEvents(const ObservableEvents&) {
  // Should never be called, ObserveEvents() is not used here.
  PERFETTO_DCHECK(false);
}

void TracingSession::DestroyConnection() {
  // Destroys the connection in a separate task. This is to avoid destroying
  // the IPC connection directly from within the IPC callback.
  TracingService::ConsumerEndpoint* endpoint = consumer_endpoint_.release();
  task_runner_->PostTask([endpoint] { delete endpoint; });
}

void TracingSession::OnTraceData(std::vector<TracePacket>, bool) {
  // This should be never called because we are using |write_into_file| and
  // asking the traced service to directly write into the |buf_fd_|.
  PERFETTO_DFATAL("Should be unreachable.");
}

void TracingSession::NotifyCallback() {
  if (!callback_)
    return;
  auto state = state_.load();
  auto callback = callback_;
  auto handle = handle_;
  auto callback_arg = callback_arg_;
  task_runner_->PostTask([callback, callback_arg, handle, state] {
    callback(handle, state, callback_arg);
  });
}

class TracingController {
 public:
  static TracingController* GetInstance();
  TracingController();

  // These methods are called from a thread != |task_runner_|.
  Handle Create(const void*, size_t, OnStateChangedCb, void* callback_arg);
  void StartTracing(Handle);
  State PollState(Handle);
  TraceBuffer ReadTrace(Handle);
  void Destroy(Handle);

 private:
  void ThreadMain();  // Called on |task_runner_| thread.

  std::mutex mutex_;
  std::thread thread_;
  std::unique_ptr<base::UnixTaskRunner> task_runner_;
  std::condition_variable task_runner_initialized_;
  Handle last_handle_ = 0;
  std::map<Handle, std::unique_ptr<TracingSession>> sessions_;
};

TracingController* TracingController::GetInstance() {
  static TracingController* instance = new TracingController();
  return instance;
}

TracingController::TracingController()
    : thread_(&TracingController::ThreadMain, this) {
  std::unique_lock<std::mutex> lock(mutex_);
  task_runner_initialized_.wait(lock, [this] { return !!task_runner_; });
}

void TracingController::ThreadMain() {
  {
    std::unique_lock<std::mutex> lock(mutex_);
    task_runner_.reset(new base::UnixTaskRunner());
  }
  task_runner_initialized_.notify_one();
  task_runner_->Run();
}

Handle TracingController::Create(const void* config_proto_buf,
                                 size_t config_len,
                                 OnStateChangedCb callback,
                                 void* callback_arg) {
  perfetto::protos::TraceConfig config_proto;
  bool parsed = config_proto.ParseFromArray(config_proto_buf,
                                            static_cast<int>(config_len));
  if (!parsed) {
    PERFETTO_ELOG("Failed to decode TraceConfig proto");
    return kInvalidHandle;
  }

  if (!config_proto.duration_ms()) {
    PERFETTO_ELOG("The trace config must specify a duration");
    return kInvalidHandle;
  }

  std::unique_lock<std::mutex> lock(mutex_);
  Handle handle = ++last_handle_;
  auto* session = new TracingSession(task_runner_.get(), handle, callback,
                                     callback_arg, config_proto);
  sessions_.emplace(handle, std::unique_ptr<TracingSession>(session));

  // Enable the TracingSession on its own thread.
  task_runner_->PostTask([session] { session->Initialize(); });

  return handle;
}

void TracingController::StartTracing(Handle handle) {
  std::unique_lock<std::mutex> lock(mutex_);
  auto it = sessions_.find(handle);
  if (it == sessions_.end()) {
    PERFETTO_ELOG("StartTracing(): Invalid tracing session handle");
    return;
  };
  TracingSession* session = it->second.get();
  task_runner_->PostTask([session] { session->StartTracing(); });
}

State TracingController::PollState(Handle handle) {
  std::unique_lock<std::mutex> lock(mutex_);
  auto it = sessions_.find(handle);
  if (it == sessions_.end())
    return State::kSessionNotFound;
  return it->second->state();
}

TraceBuffer TracingController::ReadTrace(Handle handle) {
  TraceBuffer buf{};

  std::unique_lock<std::mutex> lock(mutex_);
  auto it = sessions_.find(handle);
  if (it == sessions_.end()) {
    PERFETTO_DLOG("Handle invalid");
    return buf;
  }

  TracingSession* session = it->second.get();
  auto state = session->state();
  if (state == State::kTraceEnded) {
    std::tie(buf.begin, buf.size) = session->mapped_buf();
    return buf;
  }

  PERFETTO_DLOG("ReadTrace(): called in an unexpected state (%d)",
                static_cast<int>(state));
  return buf;
}

void TracingController::Destroy(Handle handle) {
  // Post an empty task on the task runner to delete the session on its own
  // thread.
  std::unique_lock<std::mutex> lock(mutex_);
  auto it = sessions_.find(handle);
  if (it == sessions_.end())
    return;
  TracingSession* session = it->second.release();
  sessions_.erase(it);
  task_runner_->PostTask([session] { delete session; });
}

}  // namespace

PERFETTO_EXPORTED_API Handle Create(const void* config_proto,
                                    size_t config_len,
                                    OnStateChangedCb callback,
                                    void* callback_arg) {
  return TracingController::GetInstance()->Create(config_proto, config_len,
                                                  callback, callback_arg);
}

PERFETTO_EXPORTED_API
void StartTracing(Handle handle) {
  return TracingController::GetInstance()->StartTracing(handle);
}

PERFETTO_EXPORTED_API State PollState(Handle handle) {
  return TracingController::GetInstance()->PollState(handle);
}

PERFETTO_EXPORTED_API TraceBuffer ReadTrace(Handle handle) {
  return TracingController::GetInstance()->ReadTrace(handle);
}

PERFETTO_EXPORTED_API void Destroy(Handle handle) {
  TracingController::GetInstance()->Destroy(handle);
}
}  // namespace consumer
}  // namespace perfetto