普通文本  |  281行  |  8.59 KB

/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "src/profiling/memory/shared_ring_buffer.h"

#include <array>
#include <mutex>
#include <random>
#include <thread>
#include <unordered_map>

#include "gtest/gtest.h"
#include "perfetto/base/optional.h"

namespace perfetto {
namespace profiling {
namespace {

std::string ToString(const SharedRingBuffer::Buffer& buf_and_size) {
  return std::string(reinterpret_cast<const char*>(&buf_and_size.data[0]),
                     buf_and_size.size);
}

bool TryWrite(SharedRingBuffer* wr, const char* src, size_t size) {
  SharedRingBuffer::Buffer buf;
  {
    auto lock = wr->AcquireLock(ScopedSpinlock::Mode::Try);
    if (!lock.locked())
      return false;
    buf = wr->BeginWrite(lock, size);
  }
  if (!buf)
    return false;
  memcpy(buf.data, src, size);
  wr->EndWrite(std::move(buf));
  return true;
}

void StructuredTest(SharedRingBuffer* wr, SharedRingBuffer* rd) {
  ASSERT_TRUE(wr);
  ASSERT_TRUE(wr->is_valid());
  ASSERT_TRUE(wr->size() == rd->size());
  const size_t buf_size = wr->size();

  // Test small writes.
  ASSERT_TRUE(TryWrite(wr, "foo", 4));
  ASSERT_TRUE(TryWrite(wr, "bar", 4));

  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(buf_and_size.size, 4);
    ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "foo");
    rd->EndRead(std::move(buf_and_size));
  }
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(buf_and_size.size, 4);
    ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "bar");
    rd->EndRead(std::move(buf_and_size));
  }

  for (int i = 0; i < 3; i++) {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(buf_and_size.data, nullptr);
    ASSERT_EQ(buf_and_size.size, 0);
  }

  // Test extremely large writes (fill the buffer)
  for (int i = 0; i < 3; i++) {
    // TryWrite precisely |buf_size| bytes (minus the size header itself).
    std::string data(buf_size - sizeof(uint64_t), '.' + static_cast<char>(i));
    ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
    ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
    ASSERT_FALSE(TryWrite(wr, "?", 1));

    // And read it back
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), data);
    rd->EndRead(std::move(buf_and_size));
  }

  // Test large writes that wrap.
  std::string data(buf_size / 4 * 3 - sizeof(uint64_t), '!');
  ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
  ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), data);
    rd->EndRead(std::move(buf_and_size));
  }
  data = std::string(base::kPageSize - sizeof(uint64_t), '#');
  for (int i = 0; i < 4; i++)
    ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));

  for (int i = 0; i < 4; i++) {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(buf_and_size.size, data.size());
    ASSERT_EQ(ToString(buf_and_size), data);
    rd->EndRead(std::move(buf_and_size));
  }

  // Test misaligned writes.
  ASSERT_TRUE(TryWrite(wr, "1", 1));
  ASSERT_TRUE(TryWrite(wr, "22", 2));
  ASSERT_TRUE(TryWrite(wr, "333", 3));
  ASSERT_TRUE(TryWrite(wr, "55555", 5));
  ASSERT_TRUE(TryWrite(wr, "7777777", 7));
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), "1");
    rd->EndRead(std::move(buf_and_size));
  }
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), "22");
    rd->EndRead(std::move(buf_and_size));
  }
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), "333");
    rd->EndRead(std::move(buf_and_size));
  }
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), "55555");
    rd->EndRead(std::move(buf_and_size));
  }
  {
    auto buf_and_size = rd->BeginRead();
    ASSERT_EQ(ToString(buf_and_size), "7777777");
    rd->EndRead(std::move(buf_and_size));
  }
}

TEST(SharedRingBufferTest, ReadShutdown) {
  constexpr auto kBufSize = base::kPageSize * 4;
  base::Optional<SharedRingBuffer> wr = SharedRingBuffer::Create(kBufSize);
  ASSERT_TRUE(wr);
  SharedRingBuffer rd =
      *SharedRingBuffer::Attach(base::ScopedFile(dup(wr->fd())));
  auto buf = rd.BeginRead();
  wr = base::nullopt;
  rd.EndRead(std::move(buf));
}

TEST(SharedRingBufferTest, WriteShutdown) {
  constexpr auto kBufSize = base::kPageSize * 4;
  base::Optional<SharedRingBuffer> rd = SharedRingBuffer::Create(kBufSize);
  ASSERT_TRUE(rd);
  SharedRingBuffer wr =
      *SharedRingBuffer::Attach(base::ScopedFile(dup(rd->fd())));
  SharedRingBuffer::Buffer buf;
  {
    auto lock = wr.AcquireLock(ScopedSpinlock::Mode::Blocking);
    buf = wr.BeginWrite(lock, 10);
  }
  rd = base::nullopt;
  memset(buf.data, 0, buf.size);
  wr.EndWrite(std::move(buf));
}

TEST(SharedRingBufferTest, SingleThreadSameInstance) {
  constexpr auto kBufSize = base::kPageSize * 4;
  base::Optional<SharedRingBuffer> buf = SharedRingBuffer::Create(kBufSize);
  StructuredTest(&*buf, &*buf);
}

TEST(SharedRingBufferTest, SingleThreadAttach) {
  constexpr auto kBufSize = base::kPageSize * 4;
  base::Optional<SharedRingBuffer> buf1 = SharedRingBuffer::Create(kBufSize);
  base::Optional<SharedRingBuffer> buf2 =
      SharedRingBuffer::Attach(base::ScopedFile(dup(buf1->fd())));
  StructuredTest(&*buf1, &*buf2);
}

TEST(SharedRingBufferTest, MultiThreadingTest) {
  constexpr auto kBufSize = base::kPageSize * 1024;  // 4 MB
  SharedRingBuffer rd = *SharedRingBuffer::Create(kBufSize);
  SharedRingBuffer wr =
      *SharedRingBuffer::Attach(base::ScopedFile(dup(rd.fd())));

  std::mutex mutex;
  std::unordered_map<std::string, int64_t> expected_contents;
  std::atomic<bool> writers_enabled{false};

  auto writer_thread_fn = [&wr, &expected_contents, &mutex,
                           &writers_enabled](size_t thread_id) {
    while (!writers_enabled.load()) {
    }
    std::minstd_rand0 rnd_engine(static_cast<uint32_t>(thread_id));
    std::uniform_int_distribution<size_t> dist(1, base::kPageSize * 8);
    for (int i = 0; i < 1000; i++) {
      size_t size = dist(rnd_engine);
      ASSERT_GT(size, 0);
      std::string data;
      data.resize(size);
      std::generate(data.begin(), data.end(), rnd_engine);
      if (TryWrite(&wr, data.data(), data.size())) {
        std::lock_guard<std::mutex> lock(mutex);
        expected_contents[std::move(data)]++;
      } else {
        std::this_thread::yield();
      }
    }
  };

  auto reader_thread_fn = [&rd, &expected_contents, &mutex, &writers_enabled] {
    for (;;) {
      auto buf_and_size = rd.BeginRead();
      if (!buf_and_size) {
        if (!writers_enabled.load()) {
          // Failing to read after the writers are done means that there is no
          // data left in the ring buffer.
          return;
        } else {
          std::this_thread::yield();
          continue;
        }
      }
      ASSERT_GT(buf_and_size.size, 0);
      std::string data = ToString(buf_and_size);
      std::lock_guard<std::mutex> lock(mutex);
      expected_contents[std::move(data)]--;
      rd.EndRead(std::move(buf_and_size));
    }
  };

  constexpr size_t kNumWriterThreads = 4;
  std::array<std::thread, kNumWriterThreads> writer_threads;
  for (size_t i = 0; i < kNumWriterThreads; i++)
    writer_threads[i] = std::thread(writer_thread_fn, i);

  writers_enabled.store(true);

  std::thread reader_thread(reader_thread_fn);

  for (size_t i = 0; i < kNumWriterThreads; i++)
    writer_threads[i].join();

  writers_enabled.store(false);

  reader_thread.join();
}

TEST(SharedRingBufferTest, InvalidSize) {
  constexpr auto kBufSize = base::kPageSize * 4 + 1;
  base::Optional<SharedRingBuffer> wr = SharedRingBuffer::Create(kBufSize);
  EXPECT_EQ(wr, base::nullopt);
}

TEST(SharedRingBufferTest, EmptyWrite) {
  constexpr auto kBufSize = base::kPageSize * 4;
  base::Optional<SharedRingBuffer> wr = SharedRingBuffer::Create(kBufSize);
  ASSERT_TRUE(wr);
  SharedRingBuffer::Buffer buf;
  {
    auto lock = wr->AcquireLock(ScopedSpinlock::Mode::Try);
    ASSERT_TRUE(lock.locked());
    buf = wr->BeginWrite(lock, 0);
  }
  EXPECT_TRUE(buf);
  wr->EndWrite(std::move(buf));
}

}  // namespace
}  // namespace profiling
}  // namespace perfetto