// Copyright 2015 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <brillo/streams/fake_stream.h>
#include <algorithm>
#include <base/bind.h>
#include <brillo/message_loops/message_loop.h>
#include <brillo/streams/stream_utils.h>
namespace brillo {
namespace {
// Gets a delta between the two times, makes sure that the delta is positive.
base::TimeDelta CalculateDelay(const base::Time& now,
const base::Time& delay_until) {
const base::TimeDelta zero_delay;
if (delay_until.is_null() || now >= delay_until) {
return zero_delay;
}
base::TimeDelta delay = delay_until - now;
if (delay < zero_delay)
delay = zero_delay;
return delay;
}
// Given the current clock time, and expected delays for read and write
// operations calculates the smaller wait delay of the two and sets the
// resulting operation to |*mode| and the delay to wait for into |*delay|.
void GetMinDelayAndMode(const base::Time& now,
bool read, const base::Time& delay_read_until,
bool write, const base::Time& delay_write_until,
Stream::AccessMode* mode, base::TimeDelta* delay) {
base::TimeDelta read_delay = base::TimeDelta::Max();
base::TimeDelta write_delay = base::TimeDelta::Max();
if (read)
read_delay = CalculateDelay(now, delay_read_until);
if (write)
write_delay = CalculateDelay(now, delay_write_until);
if (read_delay > write_delay) {
read = false;
} else if (read_delay < write_delay) {
write = false;
}
*mode = stream_utils::MakeAccessMode(read, write);
*delay = std::min(read_delay, write_delay);
}
} // anonymous namespace
FakeStream::FakeStream(Stream::AccessMode mode,
base::Clock* clock)
: mode_{mode}, clock_{clock} {}
void FakeStream::AddReadPacketData(base::TimeDelta delay,
const void* data,
size_t size) {
auto* byte_ptr = static_cast<const uint8_t*>(data);
AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
}
void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
InputDataPacket packet;
packet.data = std::move(data);
packet.delay_before = delay;
incoming_queue_.push(std::move(packet));
}
void FakeStream::AddReadPacketString(base::TimeDelta delay,
const std::string& data) {
AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
}
void FakeStream::QueueReadError(base::TimeDelta delay) {
QueueReadErrorWithMessage(delay, std::string{});
}
void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
const std::string& message) {
InputDataPacket packet;
packet.data.assign(message.begin(), message.end());
packet.delay_before = delay;
packet.read_error = true;
incoming_queue_.push(std::move(packet));
}
void FakeStream::ClearReadQueue() {
std::queue<InputDataPacket>().swap(incoming_queue_);
delay_input_until_ = base::Time{};
input_buffer_.clear();
input_ptr_ = 0;
report_read_error_ = 0;
}
void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
size_t data_size) {
OutputDataPacket packet;
packet.expected_size = data_size;
packet.delay_before = delay;
outgoing_queue_.push(std::move(packet));
}
void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
const void* data,
size_t size) {
auto* byte_ptr = static_cast<const uint8_t*>(data);
ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
}
void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
brillo::Blob data) {
OutputDataPacket packet;
packet.expected_size = data.size();
packet.data = std::move(data);
packet.delay_before = delay;
outgoing_queue_.push(std::move(packet));
}
void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
const std::string& data) {
ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
}
void FakeStream::QueueWriteError(base::TimeDelta delay) {
QueueWriteErrorWithMessage(delay, std::string{});
}
void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
const std::string& message) {
OutputDataPacket packet;
packet.expected_size = 0;
packet.data.assign(message.begin(), message.end());
packet.delay_before = delay;
packet.write_error = true;
outgoing_queue_.push(std::move(packet));
}
void FakeStream::ClearWriteQueue() {
std::queue<OutputDataPacket>().swap(outgoing_queue_);
delay_output_until_ = base::Time{};
output_buffer_.clear();
expected_output_data_.clear();
max_output_buffer_size_ = 0;
all_output_data_.clear();
report_write_error_ = 0;
}
const brillo::Blob& FakeStream::GetFlushedOutputData() const {
return all_output_data_;
}
std::string FakeStream::GetFlushedOutputDataAsString() const {
return std::string{all_output_data_.begin(), all_output_data_.end()};
}
bool FakeStream::CanRead() const {
return stream_utils::IsReadAccessMode(mode_);
}
bool FakeStream::CanWrite() const {
return stream_utils::IsWriteAccessMode(mode_);
}
bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
}
bool FakeStream::Seek(int64_t /* offset */,
Whence /* whence */,
uint64_t* /* new_position */,
ErrorPtr* error) {
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
}
bool FakeStream::IsReadBufferEmpty() const {
return input_ptr_ >= input_buffer_.size();
}
bool FakeStream::PopReadPacket() {
if (incoming_queue_.empty())
return false;
const InputDataPacket& packet = incoming_queue_.front();
input_ptr_ = 0;
input_buffer_ = std::move(packet.data);
delay_input_until_ = clock_->Now() + packet.delay_before;
incoming_queue_.pop();
report_read_error_ = packet.read_error;
return true;
}
bool FakeStream::ReadNonBlocking(void* buffer,
size_t size_to_read,
size_t* size_read,
bool* end_of_stream,
ErrorPtr* error) {
if (!CanRead())
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
if (!IsOpen())
return stream_utils::ErrorStreamClosed(FROM_HERE, error);
for (;;) {
if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
*size_read = 0;
if (end_of_stream)
*end_of_stream = false;
break;
}
if (report_read_error_) {
report_read_error_ = false;
std::string message{input_buffer_.begin(), input_buffer_.end()};
if (message.empty())
message = "Simulating read error for tests";
input_buffer_.clear();
Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
return false;
}
if (!IsReadBufferEmpty()) {
size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
input_ptr_ += size_to_read;
*size_read = size_to_read;
if (end_of_stream)
*end_of_stream = false;
break;
}
if (!PopReadPacket()) {
*size_read = 0;
if (end_of_stream)
*end_of_stream = true;
break;
}
}
return true;
}
bool FakeStream::IsWriteBufferFull() const {
return output_buffer_.size() >= max_output_buffer_size_;
}
bool FakeStream::PopWritePacket() {
if (outgoing_queue_.empty())
return false;
const OutputDataPacket& packet = outgoing_queue_.front();
expected_output_data_ = std::move(packet.data);
delay_output_until_ = clock_->Now() + packet.delay_before;
max_output_buffer_size_ = packet.expected_size;
report_write_error_ = packet.write_error;
outgoing_queue_.pop();
return true;
}
bool FakeStream::WriteNonBlocking(const void* buffer,
size_t size_to_write,
size_t* size_written,
ErrorPtr* error) {
if (!CanWrite())
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
if (!IsOpen())
return stream_utils::ErrorStreamClosed(FROM_HERE, error);
for (;;) {
if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
*size_written = 0;
return true;
}
if (report_write_error_) {
report_write_error_ = false;
std::string message{expected_output_data_.begin(),
expected_output_data_.end()};
if (message.empty())
message = "Simulating write error for tests";
output_buffer_.clear();
max_output_buffer_size_ = 0;
expected_output_data_.clear();
Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
return false;
}
if (!IsWriteBufferFull()) {
bool success = true;
size_to_write = std::min(size_to_write,
max_output_buffer_size_ - output_buffer_.size());
auto byte_ptr = static_cast<const uint8_t*>(buffer);
output_buffer_.insert(output_buffer_.end(),
byte_ptr, byte_ptr + size_to_write);
if (output_buffer_.size() == max_output_buffer_size_) {
if (!expected_output_data_.empty() &&
expected_output_data_ != output_buffer_) {
// We expected different data to be written, report an error.
Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
"Unexpected data written");
success = false;
}
all_output_data_.insert(all_output_data_.end(),
output_buffer_.begin(), output_buffer_.end());
output_buffer_.clear();
max_output_buffer_size_ = 0;
expected_output_data_.clear();
}
*size_written = size_to_write;
return success;
}
if (!PopWritePacket()) {
// No more data expected.
Error::AddTo(error, FROM_HERE, "fake_stream", "full",
"No more output data expected");
return false;
}
}
}
bool FakeStream::FlushBlocking(ErrorPtr* error) {
if (!CanWrite())
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
if (!IsOpen())
return stream_utils::ErrorStreamClosed(FROM_HERE, error);
bool success = true;
if (!output_buffer_.empty()) {
if (!expected_output_data_.empty() &&
expected_output_data_ != output_buffer_) {
// We expected different data to be written, report an error.
Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
"Unexpected data written");
success = false;
}
all_output_data_.insert(all_output_data_.end(),
output_buffer_.begin(), output_buffer_.end());
output_buffer_.clear();
max_output_buffer_size_ = 0;
expected_output_data_.clear();
}
return success;
}
bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
is_open_ = false;
return true;
}
bool FakeStream::WaitForData(AccessMode mode,
const base::Callback<void(AccessMode)>& callback,
ErrorPtr* error) {
bool read_requested = stream_utils::IsReadAccessMode(mode);
bool write_requested = stream_utils::IsWriteAccessMode(mode);
if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
if (read_requested && IsReadBufferEmpty())
PopReadPacket();
if (write_requested && IsWriteBufferFull())
PopWritePacket();
base::TimeDelta delay;
GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
write_requested, delay_output_until_, &mode, &delay);
MessageLoop::current()->PostDelayedTask(
FROM_HERE, base::Bind(callback, mode), delay);
return true;
}
bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
base::TimeDelta timeout,
AccessMode* out_mode,
ErrorPtr* error) {
const base::TimeDelta zero_delay;
bool read_requested = stream_utils::IsReadAccessMode(in_mode);
bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
base::TimeDelta delay;
GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
write_requested, delay_output_until_, out_mode, &delay);
if (timeout < delay)
return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
<< " ms.";
return true;
}
} // namespace brillo