// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/byte_stream.h"
#include <deque>
#include <set>
#include <utility>
#include "base/bind.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
namespace content {
namespace {
typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
ContentVector;
class ByteStreamReaderImpl;
// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
// cleared in an object destructor and accessed to check for object
// existence. We can't use weak pointers because they're tightly tied to
// threads rather than task runners.
// TODO(rdsmith): A better solution would be extending weak pointers
// to support SequencedTaskRunners.
struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
public:
LifetimeFlag() : is_alive(true) { }
bool is_alive;
protected:
friend class base::RefCountedThreadSafe<LifetimeFlag>;
virtual ~LifetimeFlag() { }
private:
DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
};
// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
// SetPeer may happen anywhere; all other operations on each class must
// happen in the context of their SequencedTaskRunner.
class ByteStreamWriterImpl : public ByteStreamWriter {
public:
ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size);
virtual ~ByteStreamWriterImpl();
// Must be called before any operations are performed.
void SetPeer(ByteStreamReaderImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag);
// Overridden from ByteStreamWriter.
virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
size_t byte_count) OVERRIDE;
virtual void Flush() OVERRIDE;
virtual void Close(int status) OVERRIDE;
virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
virtual size_t GetTotalBufferedBytes() const OVERRIDE;
// PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
ByteStreamWriterImpl* target,
size_t bytes_consumed);
private:
// Called from UpdateWindow when object existence has been validated.
void UpdateWindowInternal(size_t bytes_consumed);
void PostToPeer(bool complete, int status);
const size_t total_buffer_size_;
// All data objects in this class are only valid to access on
// this task runner except as otherwise noted.
scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
// True while this object is alive.
scoped_refptr<LifetimeFlag> my_lifetime_flag_;
base::Closure space_available_callback_;
ContentVector input_contents_;
size_t input_contents_size_;
// ** Peer information.
scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
// How much we've sent to the output that for flow control purposes we
// must assume hasn't been read yet.
size_t output_size_used_;
// Only valid to access on peer_task_runner_.
scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
// Only valid to access on peer_task_runner_ if
// |*peer_lifetime_flag_ == true|
ByteStreamReaderImpl* peer_;
};
class ByteStreamReaderImpl : public ByteStreamReader {
public:
ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size);
virtual ~ByteStreamReaderImpl();
// Must be called before any operations are performed.
void SetPeer(ByteStreamWriterImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag);
// Overridden from ByteStreamReader.
virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
size_t* length) OVERRIDE;
virtual int GetStatus() const OVERRIDE;
virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
// PostTask target from |ByteStreamWriterImpl::Write| and
// |ByteStreamWriterImpl::Close|.
// Receive data from our peer.
// static because it may be called after the object it is targeting
// has been destroyed. It may not access |*target|
// if |*object_lifetime_flag| is false.
static void TransferData(
scoped_refptr<LifetimeFlag> object_lifetime_flag,
ByteStreamReaderImpl* target,
scoped_ptr<ContentVector> transfer_buffer,
size_t transfer_buffer_bytes,
bool source_complete,
int status);
private:
// Called from TransferData once object existence has been validated.
void TransferDataInternal(
scoped_ptr<ContentVector> transfer_buffer,
size_t transfer_buffer_bytes,
bool source_complete,
int status);
void MaybeUpdateInput();
const size_t total_buffer_size_;
scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
// True while this object is alive.
scoped_refptr<LifetimeFlag> my_lifetime_flag_;
ContentVector available_contents_;
bool received_status_;
int status_;
base::Closure data_available_callback_;
// Time of last point at which data in stream transitioned from full
// to non-full. Nulled when a callback is sent.
base::Time last_non_full_time_;
// ** Peer information
scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
// How much has been removed from this class that we haven't told
// the input about yet.
size_t unreported_consumed_bytes_;
// Only valid to access on peer_task_runner_.
scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
// Only valid to access on peer_task_runner_ if
// |*peer_lifetime_flag_ == true|
ByteStreamWriterImpl* peer_;
};
ByteStreamWriterImpl::ByteStreamWriterImpl(
scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size)
: total_buffer_size_(buffer_size),
my_task_runner_(task_runner),
my_lifetime_flag_(lifetime_flag),
input_contents_size_(0),
output_size_used_(0),
peer_(NULL) {
DCHECK(my_lifetime_flag_.get());
my_lifetime_flag_->is_alive = true;
}
ByteStreamWriterImpl::~ByteStreamWriterImpl() {
my_lifetime_flag_->is_alive = false;
}
void ByteStreamWriterImpl::SetPeer(
ByteStreamReaderImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
peer_ = peer;
peer_task_runner_ = peer_task_runner;
peer_lifetime_flag_ = peer_lifetime_flag;
}
bool ByteStreamWriterImpl::Write(
scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
// Check overflow.
//
// TODO(tyoshino): Discuss with content/browser/download developer and if
// they're fine with, set smaller limit and make it configurable.
size_t space_limit = std::numeric_limits<size_t>::max() -
GetTotalBufferedBytes();
if (byte_count > space_limit) {
// TODO(tyoshino): Tell the user that Write() failed.
// Ignore input.
return false;
}
input_contents_.push_back(std::make_pair(buffer, byte_count));
input_contents_size_ += byte_count;
// Arbitrarily, we buffer to a third of the total size before sending.
if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
PostToPeer(false, 0);
return GetTotalBufferedBytes() <= total_buffer_size_;
}
void ByteStreamWriterImpl::Flush() {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
if (input_contents_size_ > 0)
PostToPeer(false, 0);
}
void ByteStreamWriterImpl::Close(int status) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
PostToPeer(true, status);
}
void ByteStreamWriterImpl::RegisterCallback(
const base::Closure& source_callback) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
space_available_callback_ = source_callback;
}
size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
// This sum doesn't overflow since Write() fails if this sum is going to
// overflow.
return input_contents_size_ + output_size_used_;
}
// static
void ByteStreamWriterImpl::UpdateWindow(
scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
size_t bytes_consumed) {
// If the target object isn't alive anymore, we do nothing.
if (!lifetime_flag->is_alive) return;
target->UpdateWindowInternal(bytes_consumed);
}
void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
DCHECK_GE(output_size_used_, bytes_consumed);
output_size_used_ -= bytes_consumed;
// Callback if we were above the limit and we're now <= to it.
bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
if (no_longer_above_limit && was_above_limit &&
!space_available_callback_.is_null())
space_available_callback_.Run();
}
void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
// Valid contexts in which to call.
DCHECK(complete || 0 != input_contents_size_);
scoped_ptr<ContentVector> transfer_buffer;
size_t buffer_size = 0;
if (0 != input_contents_size_) {
transfer_buffer.reset(new ContentVector);
transfer_buffer->swap(input_contents_);
buffer_size = input_contents_size_;
output_size_used_ += input_contents_size_;
input_contents_size_ = 0;
}
peer_task_runner_->PostTask(
FROM_HERE, base::Bind(
&ByteStreamReaderImpl::TransferData,
peer_lifetime_flag_,
peer_,
base::Passed(&transfer_buffer),
buffer_size,
complete,
status));
}
ByteStreamReaderImpl::ByteStreamReaderImpl(
scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<LifetimeFlag> lifetime_flag,
size_t buffer_size)
: total_buffer_size_(buffer_size),
my_task_runner_(task_runner),
my_lifetime_flag_(lifetime_flag),
received_status_(false),
status_(0),
unreported_consumed_bytes_(0),
peer_(NULL) {
DCHECK(my_lifetime_flag_.get());
my_lifetime_flag_->is_alive = true;
}
ByteStreamReaderImpl::~ByteStreamReaderImpl() {
my_lifetime_flag_->is_alive = false;
}
void ByteStreamReaderImpl::SetPeer(
ByteStreamWriterImpl* peer,
scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
peer_ = peer;
peer_task_runner_ = peer_task_runner;
peer_lifetime_flag_ = peer_lifetime_flag;
}
ByteStreamReaderImpl::StreamState
ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
size_t* length) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
if (available_contents_.size()) {
*data = available_contents_.front().first;
*length = available_contents_.front().second;
available_contents_.pop_front();
unreported_consumed_bytes_ += *length;
MaybeUpdateInput();
return STREAM_HAS_DATA;
}
if (received_status_) {
return STREAM_COMPLETE;
}
return STREAM_EMPTY;
}
int ByteStreamReaderImpl::GetStatus() const {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
DCHECK(received_status_);
return status_;
}
void ByteStreamReaderImpl::RegisterCallback(
const base::Closure& sink_callback) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
data_available_callback_ = sink_callback;
}
// static
void ByteStreamReaderImpl::TransferData(
scoped_refptr<LifetimeFlag> object_lifetime_flag,
ByteStreamReaderImpl* target,
scoped_ptr<ContentVector> transfer_buffer,
size_t buffer_size,
bool source_complete,
int status) {
// If our target is no longer alive, do nothing.
if (!object_lifetime_flag->is_alive) return;
target->TransferDataInternal(
transfer_buffer.Pass(), buffer_size, source_complete, status);
}
void ByteStreamReaderImpl::TransferDataInternal(
scoped_ptr<ContentVector> transfer_buffer,
size_t buffer_size,
bool source_complete,
int status) {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
bool was_empty = available_contents_.empty();
if (transfer_buffer) {
available_contents_.insert(available_contents_.end(),
transfer_buffer->begin(),
transfer_buffer->end());
}
if (source_complete) {
received_status_ = true;
status_ = status;
}
// Callback on transition from empty to non-empty, or
// source complete.
if (((was_empty && !available_contents_.empty()) ||
source_complete) &&
!data_available_callback_.is_null())
data_available_callback_.Run();
}
// Decide whether or not to send the input a window update.
// Currently we do that whenever we've got unreported consumption
// greater than 1/3 of total size.
void ByteStreamReaderImpl::MaybeUpdateInput() {
DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
if (unreported_consumed_bytes_ <=
total_buffer_size_ / kFractionReadBeforeWindowUpdate)
return;
peer_task_runner_->PostTask(
FROM_HERE, base::Bind(
&ByteStreamWriterImpl::UpdateWindow,
peer_lifetime_flag_,
peer_,
unreported_consumed_bytes_));
unreported_consumed_bytes_ = 0;
}
} // namespace
const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
ByteStreamReader::~ByteStreamReader() { }
ByteStreamWriter::~ByteStreamWriter() { }
void CreateByteStream(
scoped_refptr<base::SequencedTaskRunner> input_task_runner,
scoped_refptr<base::SequencedTaskRunner> output_task_runner,
size_t buffer_size,
scoped_ptr<ByteStreamWriter>* input,
scoped_ptr<ByteStreamReader>* output) {
scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
input_task_runner, input_flag, buffer_size);
ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
output_task_runner, output_flag, buffer_size);
in->SetPeer(out, output_task_runner, output_flag);
out->SetPeer(in, input_task_runner, input_flag);
input->reset(in);
output->reset(out);
}
} // namespace content