// Copyright 2015 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <brillo/streams/stream.h>
#include <algorithm>
#include <base/bind.h>
#include <brillo/message_loops/message_loop.h>
#include <brillo/pointer_utils.h>
#include <brillo/streams/stream_errors.h>
#include <brillo/streams/stream_utils.h>
namespace brillo {
bool Stream::TruncateBlocking(ErrorPtr* error) {
return SetSizeBlocking(GetPosition(), error);
}
bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
return false;
return Seek(position, Whence::FROM_BEGIN, nullptr, error);
}
bool Stream::ReadAsync(void* buffer,
size_t size_to_read,
const base::Callback<void(size_t)>& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error) {
if (is_async_read_pending_) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kOperationNotSupported,
"Another asynchronous operation is still pending");
return false;
}
auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
// If we can read some data right away non-blocking we should still run the
// callback from the main loop, so we pass true here for force_async_callback.
return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
true);
}
bool Stream::ReadAllAsync(void* buffer,
size_t size_to_read,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error) {
if (is_async_read_pending_) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kOperationNotSupported,
"Another asynchronous operation is still pending");
return false;
}
auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_read, success_callback, error_callback);
return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
true);
}
bool Stream::ReadBlocking(void* buffer,
size_t size_to_read,
size_t* size_read,
ErrorPtr* error) {
for (;;) {
bool eos = false;
if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
return false;
if (*size_read > 0 || eos)
break;
if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
error)) {
return false;
}
}
return true;
}
bool Stream::ReadAllBlocking(void* buffer,
size_t size_to_read,
ErrorPtr* error) {
while (size_to_read > 0) {
size_t size_read = 0;
if (!ReadBlocking(buffer, size_to_read, &size_read, error))
return false;
if (size_read == 0)
return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);
size_to_read -= size_read;
buffer = AdvancePointer(buffer, size_read);
}
return true;
}
bool Stream::WriteAsync(const void* buffer,
size_t size_to_write,
const base::Callback<void(size_t)>& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error) {
if (is_async_write_pending_) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kOperationNotSupported,
"Another asynchronous operation is still pending");
return false;
}
// If we can read some data right away non-blocking we should still run the
// callback from the main loop, so we pass true here for force_async_callback.
return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
error, true);
}
bool Stream::WriteAllAsync(const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error) {
if (is_async_write_pending_) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kOperationNotSupported,
"Another asynchronous operation is still pending");
return false;
}
auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_write, success_callback, error_callback);
return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
true);
}
bool Stream::WriteBlocking(const void* buffer,
size_t size_to_write,
size_t* size_written,
ErrorPtr* error) {
for (;;) {
if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
return false;
if (*size_written > 0 || size_to_write == 0)
break;
if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
error)) {
return false;
}
}
return true;
}
bool Stream::WriteAllBlocking(const void* buffer,
size_t size_to_write,
ErrorPtr* error) {
while (size_to_write > 0) {
size_t size_written = 0;
if (!WriteBlocking(buffer, size_to_write, &size_written, error))
return false;
if (size_written == 0) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kPartialData,
"Failed to write all the data");
return false;
}
size_to_write -= size_written;
buffer = AdvancePointer(buffer, size_written);
}
return true;
}
bool Stream::FlushAsync(const base::Closure& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* /* error */) {
auto callback = base::Bind(&Stream::FlushAsyncCallback,
weak_ptr_factory_.GetWeakPtr(),
success_callback, error_callback);
MessageLoop::current()->PostTask(FROM_HERE, callback);
return true;
}
void Stream::IgnoreEOSCallback(
const base::Callback<void(size_t)>& success_callback,
size_t bytes,
bool /* eos */) {
success_callback.Run(bytes);
}
bool Stream::ReadAsyncImpl(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t, bool)>& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error,
bool force_async_callback) {
CHECK(!is_async_read_pending_);
// We set this value to true early in the function so calling others will
// prevent us from calling WaitForData() to make calls to
// ReadAsync() fail while we run WaitForData().
is_async_read_pending_ = true;
size_t read = 0;
bool eos = false;
if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
return false;
if (read > 0 || eos) {
if (force_async_callback) {
MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&Stream::OnReadAsyncDone, weak_ptr_factory_.GetWeakPtr(),
success_callback, read, eos));
} else {
is_async_read_pending_ = false;
success_callback.Run(read, eos);
}
return true;
}
is_async_read_pending_ = WaitForData(
AccessMode::READ,
base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
buffer, size_to_read, success_callback, error_callback),
error);
return is_async_read_pending_;
}
void Stream::OnReadAsyncDone(
const base::Callback<void(size_t, bool)>& success_callback,
size_t bytes_read,
bool eos) {
is_async_read_pending_ = false;
success_callback.Run(bytes_read, eos);
}
void Stream::OnReadAvailable(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t, bool)>& success_callback,
const ErrorCallback& error_callback,
AccessMode mode) {
CHECK(stream_utils::IsReadAccessMode(mode));
CHECK(is_async_read_pending_);
is_async_read_pending_ = false;
ErrorPtr error;
// Just reschedule the read operation but don't need to run the callback from
// the main loop since we are already running on a callback.
if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
&error, false)) {
error_callback.Run(error.get());
}
}
bool Stream::WriteAsyncImpl(
const void* buffer,
size_t size_to_write,
const base::Callback<void(size_t)>& success_callback,
const ErrorCallback& error_callback,
ErrorPtr* error,
bool force_async_callback) {
CHECK(!is_async_write_pending_);
// We set this value to true early in the function so calling others will
// prevent us from calling WaitForData() to make calls to
// ReadAsync() fail while we run WaitForData().
is_async_write_pending_ = true;
size_t written = 0;
if (!WriteNonBlocking(buffer, size_to_write, &written, error))
return false;
if (written > 0) {
if (force_async_callback) {
MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&Stream::OnWriteAsyncDone, weak_ptr_factory_.GetWeakPtr(),
success_callback, written));
} else {
is_async_write_pending_ = false;
success_callback.Run(written);
}
return true;
}
is_async_write_pending_ = WaitForData(
AccessMode::WRITE,
base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
buffer, size_to_write, success_callback, error_callback),
error);
return is_async_write_pending_;
}
void Stream::OnWriteAsyncDone(
const base::Callback<void(size_t)>& success_callback,
size_t size_written) {
is_async_write_pending_ = false;
success_callback.Run(size_written);
}
void Stream::OnWriteAvailable(
const void* buffer,
size_t size,
const base::Callback<void(size_t)>& success_callback,
const ErrorCallback& error_callback,
AccessMode mode) {
CHECK(stream_utils::IsWriteAccessMode(mode));
CHECK(is_async_write_pending_);
is_async_write_pending_ = false;
ErrorPtr error;
// Just reschedule the read operation but don't need to run the callback from
// the main loop since we are already running on a callback.
if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
false)) {
error_callback.Run(error.get());
}
}
void Stream::ReadAllAsyncCallback(void* buffer,
size_t size_to_read,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
size_t size_read,
bool eos) {
ErrorPtr error;
size_to_read -= size_read;
if (size_to_read != 0 && eos) {
stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
error_callback.Run(error.get());
return;
}
if (size_to_read) {
buffer = AdvancePointer(buffer, size_read);
auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_read, success_callback, error_callback);
if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
false)) {
error_callback.Run(error.get());
}
} else {
success_callback.Run();
}
}
void Stream::WriteAllAsyncCallback(const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,
const ErrorCallback& error_callback,
size_t size_written) {
ErrorPtr error;
if (size_to_write != 0 && size_written == 0) {
Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
errors::stream::kPartialData, "Failed to write all the data");
error_callback.Run(error.get());
return;
}
size_to_write -= size_written;
if (size_to_write) {
buffer = AdvancePointer(buffer, size_written);
auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
weak_ptr_factory_.GetWeakPtr(), buffer,
size_to_write, success_callback, error_callback);
if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
false)) {
error_callback.Run(error.get());
}
} else {
success_callback.Run();
}
}
void Stream::FlushAsyncCallback(const base::Closure& success_callback,
const ErrorCallback& error_callback) {
ErrorPtr error;
if (FlushBlocking(&error)) {
success_callback.Run();
} else {
error_callback.Run(error.get());
}
}
void Stream::CancelPendingAsyncOperations() {
weak_ptr_factory_.InvalidateWeakPtrs();
is_async_read_pending_ = false;
is_async_write_pending_ = false;
}
} // namespace brillo