// Copyright 2015 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include <brillo/streams/input_stream_set.h> #include <base/bind.h> #include <brillo/message_loops/message_loop.h> #include <brillo/streams/stream_errors.h> #include <brillo/streams/stream_utils.h> namespace brillo { InputStreamSet::InputStreamSet( std::vector<Stream*> source_streams, std::vector<StreamPtr> owned_source_streams, uint64_t initial_stream_size) : source_streams_{std::move(source_streams)}, owned_source_streams_{std::move(owned_source_streams)}, initial_stream_size_{initial_stream_size} {} StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams, std::vector<StreamPtr> owned_source_streams, ErrorPtr* error) { StreamPtr stream; if (source_streams.empty()) { Error::AddTo(error, FROM_HERE, errors::stream::kDomain, errors::stream::kInvalidParameter, "Source stream list is empty"); return stream; } // Make sure we have only readable streams. for (Stream* src_stream : source_streams) { if (!src_stream->CanRead()) { Error::AddTo(error, FROM_HERE, errors::stream::kDomain, errors::stream::kInvalidParameter, "The stream list must contain only readable streams"); return stream; } } // We are using remaining size here because the multiplexed stream is not // seekable and the bytes already read are essentially "lost" as far as this // stream is concerned. uint64_t initial_stream_size = 0; for (const Stream* stream : source_streams) initial_stream_size += stream->GetRemainingSize(); stream.reset(new InputStreamSet{std::move(source_streams), std::move(owned_source_streams), initial_stream_size}); return stream; } StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams, ErrorPtr* error) { return Create(std::move(source_streams), {}, error); } StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams, ErrorPtr* error) { std::vector<Stream*> source_streams; source_streams.reserve(owned_source_streams.size()); for (const StreamPtr& stream : owned_source_streams) source_streams.push_back(stream.get()); return Create(std::move(source_streams), std::move(owned_source_streams), error); } bool InputStreamSet::IsOpen() const { return !closed_; } bool InputStreamSet::CanGetSize() const { bool can_get_size = IsOpen(); for (const Stream* stream : source_streams_) { if (!stream->CanGetSize()) { can_get_size = false; break; } } return can_get_size; } uint64_t InputStreamSet::GetSize() const { return initial_stream_size_; } bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) { return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); } uint64_t InputStreamSet::GetRemainingSize() const { uint64_t size = 0; for (const Stream* stream : source_streams_) size += stream->GetRemainingSize(); return size; } bool InputStreamSet::Seek(int64_t /* offset */, Whence /* whence */, uint64_t* /* new_position */, ErrorPtr* error) { return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); } bool InputStreamSet::ReadNonBlocking(void* buffer, size_t size_to_read, size_t* size_read, bool* end_of_stream, ErrorPtr* error) { if (!IsOpen()) return stream_utils::ErrorStreamClosed(FROM_HERE, error); while (!source_streams_.empty()) { Stream* stream = source_streams_.front(); bool eos = false; if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error)) return false; if (*size_read > 0 || !eos) { if (end_of_stream) *end_of_stream = false; return true; } source_streams_.erase(source_streams_.begin()); } *size_read = 0; if (end_of_stream) *end_of_stream = true; return true; } bool InputStreamSet::WriteNonBlocking(const void* /* buffer */, size_t /* size_to_write */, size_t* /* size_written */, ErrorPtr* error) { return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); } bool InputStreamSet::CloseBlocking(ErrorPtr* error) { bool success = true; // We want to close only the owned streams. for (StreamPtr& stream_ptr : owned_source_streams_) { if (!stream_ptr->CloseBlocking(error)) success = false; // Keep going for other streams... } owned_source_streams_.clear(); source_streams_.clear(); initial_stream_size_ = 0; closed_ = true; return success; } bool InputStreamSet::WaitForData( AccessMode mode, const base::Callback<void(AccessMode)>& callback, ErrorPtr* error) { if (!IsOpen()) return stream_utils::ErrorStreamClosed(FROM_HERE, error); if (stream_utils::IsWriteAccessMode(mode)) return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); if (!source_streams_.empty()) { Stream* stream = source_streams_.front(); return stream->WaitForData(mode, callback, error); } MessageLoop::current()->PostTask(FROM_HERE, base::Bind(callback, mode)); return true; } bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode, base::TimeDelta timeout, AccessMode* out_mode, ErrorPtr* error) { if (!IsOpen()) return stream_utils::ErrorStreamClosed(FROM_HERE, error); if (stream_utils::IsWriteAccessMode(in_mode)) return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); if (!source_streams_.empty()) { Stream* stream = source_streams_.front(); return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error); } if (out_mode) *out_mode = in_mode; return true; } void InputStreamSet::CancelPendingAsyncOperations() { if (IsOpen() && !source_streams_.empty()) { Stream* stream = source_streams_.front(); stream->CancelPendingAsyncOperations(); } Stream::CancelPendingAsyncOperations(); } } // namespace brillo