// Copyright (c) 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "content/browser/streams/stream.h" #include "base/bind.h" #include "base/location.h" #include "base/message_loop/message_loop_proxy.h" #include "base/values.h" #include "content/browser/streams/stream_handle_impl.h" #include "content/browser/streams/stream_read_observer.h" #include "content/browser/streams/stream_registry.h" #include "content/browser/streams/stream_write_observer.h" #include "net/base/io_buffer.h" #include "net/http/http_response_headers.h" namespace { // Start throttling the connection at about 1MB. const size_t kDeferSizeThreshold = 40 * 32768; } namespace content { Stream::Stream(StreamRegistry* registry, StreamWriteObserver* write_observer, const GURL& url) : can_add_data_(true), url_(url), data_length_(0), data_bytes_read_(0), last_total_buffered_bytes_(0), registry_(registry), read_observer_(NULL), write_observer_(write_observer), stream_handle_(NULL), weak_ptr_factory_(this) { CreateByteStream(base::MessageLoopProxy::current(), base::MessageLoopProxy::current(), kDeferSizeThreshold, &writer_, &reader_); // Setup callback for writing. writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, weak_ptr_factory_.GetWeakPtr())); reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); registry_->RegisterStream(this); } Stream::~Stream() { } bool Stream::SetReadObserver(StreamReadObserver* observer) { if (read_observer_) return false; read_observer_ = observer; return true; } void Stream::RemoveReadObserver(StreamReadObserver* observer) { DCHECK(observer == read_observer_); read_observer_ = NULL; } void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { DCHECK(observer == write_observer_); write_observer_ = NULL; } void Stream::Abort() { // Clear all buffer. It's safe to clear reader_ here since the same thread // is used for both input and output operation. writer_.reset(); reader_.reset(); ClearBuffer(); can_add_data_ = false; registry_->UnregisterStream(url()); } void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { if (!writer_.get()) return; size_t current_buffered_bytes = writer_->GetTotalBufferedBytes(); if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) { Abort(); return; } // Now it's guaranteed that this doesn't overflow. This must be done before // Write() since GetTotalBufferedBytes() may return different value after // Write() call, so if we use the new value, information in this instance and // one in |registry_| become inconsistent. last_total_buffered_bytes_ = current_buffered_bytes + size; can_add_data_ = writer_->Write(buffer, size); } void Stream::AddData(const char* data, size_t size) { if (!writer_.get()) return; scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); memcpy(io_buffer->data(), data, size); AddData(io_buffer, size); } void Stream::Finalize() { if (!writer_.get()) return; writer_->Close(0); writer_.reset(); // Continue asynchronously. base::MessageLoopProxy::current()->PostTask( FROM_HERE, base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); } Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) { DCHECK(buf); DCHECK(bytes_read); *bytes_read = 0; if (!data_.get()) { DCHECK(!data_length_); DCHECK(!data_bytes_read_); if (!reader_.get()) return STREAM_ABORTED; ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); switch (state) { case ByteStreamReader::STREAM_HAS_DATA: break; case ByteStreamReader::STREAM_COMPLETE: registry_->UnregisterStream(url()); return STREAM_COMPLETE; case ByteStreamReader::STREAM_EMPTY: return STREAM_EMPTY; } } const size_t remaining_bytes = data_length_ - data_bytes_read_; size_t to_read = static_cast<size_t>(buf_size) < remaining_bytes ? buf_size : remaining_bytes; memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); data_bytes_read_ += to_read; if (data_bytes_read_ >= data_length_) ClearBuffer(); *bytes_read = to_read; return STREAM_HAS_DATA; } scoped_ptr<StreamHandle> Stream::CreateHandle( const GURL& original_url, const std::string& mime_type, scoped_refptr<net::HttpResponseHeaders> response_headers) { CHECK(!stream_handle_); stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), original_url, mime_type, response_headers); return scoped_ptr<StreamHandle>(stream_handle_).Pass(); } void Stream::CloseHandle() { // Prevent deletion until this function ends. scoped_refptr<Stream> ref(this); CHECK(stream_handle_); stream_handle_ = NULL; registry_->UnregisterStream(url()); if (write_observer_) write_observer_->OnClose(this); } void Stream::OnSpaceAvailable() { can_add_data_ = true; if (write_observer_) write_observer_->OnSpaceAvailable(this); } void Stream::OnDataAvailable() { if (read_observer_) read_observer_->OnDataAvailable(this); } void Stream::ClearBuffer() { data_ = NULL; data_length_ = 0; data_bytes_read_ = 0; } } // namespace content