// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "jingle/glue/pseudotcp_adapter.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/time/time.h"
#include "net/base/address_list.h"
#include "net/base/completion_callback.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/net_util.h"
using cricket::PseudoTcp;
namespace {
const int kReadBufferSize = 65536; // Maximum size of a packet.
const uint16 kDefaultMtu = 1280;
} // namespace
namespace jingle_glue {
class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
public base::RefCounted<Core> {
public:
Core(net::Socket* socket);
// Functions used to implement net::StreamSocket.
int Read(net::IOBuffer* buffer, int buffer_size,
const net::CompletionCallback& callback);
int Write(net::IOBuffer* buffer, int buffer_size,
const net::CompletionCallback& callback);
int Connect(const net::CompletionCallback& callback);
void Disconnect();
bool IsConnected() const;
// cricket::IPseudoTcpNotify interface.
// These notifications are triggered from NotifyPacket.
virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE;
virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE;
virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE;
// This is triggered by NotifyClock or NotifyPacket.
virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE;
// This is triggered by NotifyClock, NotifyPacket, Recv and Send.
virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
const char* buffer, size_t len) OVERRIDE;
void SetAckDelay(int delay_ms);
void SetNoDelay(bool no_delay);
void SetReceiveBufferSize(int32 size);
void SetSendBufferSize(int32 size);
void SetWriteWaitsForSend(bool write_waits_for_send);
void DeleteSocket();
private:
friend class base::RefCounted<Core>;
virtual ~Core();
// These are invoked by the underlying Socket, and may trigger callbacks.
// They hold a reference to |this| while running, to protect from deletion.
void OnRead(int result);
void OnWritten(int result);
// These may trigger callbacks, so the holder must hold a reference on
// the stack while calling them.
void DoReadFromSocket();
void HandleReadResults(int result);
void HandleTcpClock();
// Checks if current write has completed in the write-waits-for-send
// mode.
void CheckWriteComplete();
// This re-sets |timer| without triggering callbacks.
void AdjustClock();
net::CompletionCallback connect_callback_;
net::CompletionCallback read_callback_;
net::CompletionCallback write_callback_;
cricket::PseudoTcp pseudo_tcp_;
scoped_ptr<net::Socket> socket_;
scoped_refptr<net::IOBuffer> read_buffer_;
int read_buffer_size_;
scoped_refptr<net::IOBuffer> write_buffer_;
int write_buffer_size_;
// Whether we need to wait for data to be sent before completing write.
bool write_waits_for_send_;
// Set to true in the write-waits-for-send mode when we've
// successfully writtend data to the send buffer and waiting for the
// data to be sent to the remote end.
bool waiting_write_position_;
// Number of the bytes written by the last write stored while we wait
// for the data to be sent (i.e. when waiting_write_position_ = true).
int last_write_result_;
bool socket_write_pending_;
scoped_refptr<net::IOBuffer> socket_read_buffer_;
base::OneShotTimer<Core> timer_;
DISALLOW_COPY_AND_ASSIGN(Core);
};
PseudoTcpAdapter::Core::Core(net::Socket* socket)
: pseudo_tcp_(this, 0),
socket_(socket),
write_waits_for_send_(false),
waiting_write_position_(false),
socket_write_pending_(false) {
// Doesn't trigger callbacks.
pseudo_tcp_.NotifyMTU(kDefaultMtu);
}
PseudoTcpAdapter::Core::~Core() {
}
int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size,
const net::CompletionCallback& callback) {
DCHECK(read_callback_.is_null());
// Reference the Core in case a callback deletes the adapter.
scoped_refptr<Core> core(this);
int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
if (result < 0) {
result = net::MapSystemError(pseudo_tcp_.GetError());
DCHECK(result < 0);
}
if (result == net::ERR_IO_PENDING) {
read_buffer_ = buffer;
read_buffer_size_ = buffer_size;
read_callback_ = callback;
}
AdjustClock();
return result;
}
int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size,
const net::CompletionCallback& callback) {
DCHECK(write_callback_.is_null());
// Reference the Core in case a callback deletes the adapter.
scoped_refptr<Core> core(this);
int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
if (result < 0) {
result = net::MapSystemError(pseudo_tcp_.GetError());
DCHECK(result < 0);
}
AdjustClock();
if (result == net::ERR_IO_PENDING) {
write_buffer_ = buffer;
write_buffer_size_ = buffer_size;
write_callback_ = callback;
return result;
}
if (result < 0)
return result;
// Need to wait until the data is sent to the peer when
// send-confirmation mode is enabled.
if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
DCHECK(!waiting_write_position_);
waiting_write_position_ = true;
last_write_result_ = result;
write_buffer_ = buffer;
write_buffer_size_ = buffer_size;
write_callback_ = callback;
return net::ERR_IO_PENDING;
}
return result;
}
int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
// Reference the Core in case a callback deletes the adapter.
scoped_refptr<Core> core(this);
// Start the connection attempt.
int result = pseudo_tcp_.Connect();
if (result < 0)
return net::ERR_FAILED;
AdjustClock();
connect_callback_ = callback;
DoReadFromSocket();
return net::ERR_IO_PENDING;
}
void PseudoTcpAdapter::Core::Disconnect() {
// Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
read_callback_.Reset();
read_buffer_ = NULL;
write_callback_.Reset();
write_buffer_ = NULL;
connect_callback_.Reset();
// TODO(wez): Connect should succeed if called after Disconnect, which
// PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
// and create a new one in Connect.
// TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
// effect. This should be addressed in PseudoTcp, really.
// In the meantime we can fake OnTcpClosed notification and tear down the
// PseudoTcp.
pseudo_tcp_.Close(true);
}
bool PseudoTcpAdapter::Core::IsConnected() const {
return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
}
void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
DCHECK(tcp == &pseudo_tcp_);
if (!connect_callback_.is_null()) {
net::CompletionCallback callback = connect_callback_;
connect_callback_.Reset();
callback.Run(net::OK);
}
OnTcpReadable(tcp);
OnTcpWriteable(tcp);
}
void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
DCHECK_EQ(tcp, &pseudo_tcp_);
if (read_callback_.is_null())
return;
int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
if (result < 0) {
result = net::MapSystemError(pseudo_tcp_.GetError());
DCHECK(result < 0);
if (result == net::ERR_IO_PENDING)
return;
}
AdjustClock();
net::CompletionCallback callback = read_callback_;
read_callback_.Reset();
read_buffer_ = NULL;
callback.Run(result);
}
void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
DCHECK_EQ(tcp, &pseudo_tcp_);
if (write_callback_.is_null())
return;
if (waiting_write_position_) {
CheckWriteComplete();
return;
}
int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
if (result < 0) {
result = net::MapSystemError(pseudo_tcp_.GetError());
DCHECK(result < 0);
if (result == net::ERR_IO_PENDING)
return;
}
AdjustClock();
if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
DCHECK(!waiting_write_position_);
waiting_write_position_ = true;
last_write_result_ = result;
return;
}
net::CompletionCallback callback = write_callback_;
write_callback_.Reset();
write_buffer_ = NULL;
callback.Run(result);
}
void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
DCHECK_EQ(tcp, &pseudo_tcp_);
if (!connect_callback_.is_null()) {
net::CompletionCallback callback = connect_callback_;
connect_callback_.Reset();
callback.Run(net::MapSystemError(error));
}
if (!read_callback_.is_null()) {
net::CompletionCallback callback = read_callback_;
read_callback_.Reset();
callback.Run(net::MapSystemError(error));
}
if (!write_callback_.is_null()) {
net::CompletionCallback callback = write_callback_;
write_callback_.Reset();
callback.Run(net::MapSystemError(error));
}
}
void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
}
void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
}
void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
}
void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
}
void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
write_waits_for_send_ = write_waits_for_send;
}
void PseudoTcpAdapter::Core::DeleteSocket() {
socket_.reset();
}
cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
PseudoTcp* tcp,
const char* buffer,
size_t len) {
DCHECK_EQ(tcp, &pseudo_tcp_);
// If we already have a write pending, we behave like a congested network,
// returning success for the write, but dropping the packet. PseudoTcp will
// back-off and retransmit, adjusting for the perceived congestion.
if (socket_write_pending_)
return IPseudoTcpNotify::WR_SUCCESS;
scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
memcpy(write_buffer->data(), buffer, len);
// Our underlying socket is datagram-oriented, which means it should either
// send exactly as many bytes as we requested, or fail.
int result;
if (socket_.get()) {
result = socket_->Write(
write_buffer.get(),
len,
base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
} else {
result = net::ERR_CONNECTION_CLOSED;
}
if (result == net::ERR_IO_PENDING) {
socket_write_pending_ = true;
return IPseudoTcpNotify::WR_SUCCESS;
} else if (result == net::ERR_MSG_TOO_BIG) {
return IPseudoTcpNotify::WR_TOO_LARGE;
} else if (result < 0) {
return IPseudoTcpNotify::WR_FAIL;
} else {
return IPseudoTcpNotify::WR_SUCCESS;
}
}
void PseudoTcpAdapter::Core::DoReadFromSocket() {
if (!socket_read_buffer_.get())
socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
int result = 1;
while (socket_.get() && result > 0) {
result = socket_->Read(
socket_read_buffer_.get(),
kReadBufferSize,
base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
if (result != net::ERR_IO_PENDING)
HandleReadResults(result);
}
}
void PseudoTcpAdapter::Core::HandleReadResults(int result) {
if (result <= 0) {
LOG(ERROR) << "Read returned " << result;
return;
}
// TODO(wez): Disconnect on failure of NotifyPacket?
pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
AdjustClock();
CheckWriteComplete();
}
void PseudoTcpAdapter::Core::OnRead(int result) {
// Reference the Core in case a callback deletes the adapter.
scoped_refptr<Core> core(this);
HandleReadResults(result);
if (result >= 0)
DoReadFromSocket();
}
void PseudoTcpAdapter::Core::OnWritten(int result) {
// Reference the Core in case a callback deletes the adapter.
scoped_refptr<Core> core(this);
socket_write_pending_ = false;
if (result < 0) {
LOG(WARNING) << "Write failed. Error code: " << result;
}
}
void PseudoTcpAdapter::Core::AdjustClock() {
long timeout = 0;
if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
timer_.Stop();
timer_.Start(FROM_HERE,
base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
&PseudoTcpAdapter::Core::HandleTcpClock);
}
}
void PseudoTcpAdapter::Core::HandleTcpClock() {
// Reference the Core in case a callback deletes the adapter.
scoped_refptr<Core> core(this);
pseudo_tcp_.NotifyClock(PseudoTcp::Now());
AdjustClock();
CheckWriteComplete();
}
void PseudoTcpAdapter::Core::CheckWriteComplete() {
if (!write_callback_.is_null() && waiting_write_position_) {
if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
waiting_write_position_ = false;
net::CompletionCallback callback = write_callback_;
write_callback_.Reset();
write_buffer_ = NULL;
callback.Run(last_write_result_);
}
}
}
// Public interface implemention.
PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
: core_(new Core(socket)) {
}
PseudoTcpAdapter::~PseudoTcpAdapter() {
Disconnect();
// Make sure that the underlying socket is destroyed before PseudoTcp.
core_->DeleteSocket();
}
int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
const net::CompletionCallback& callback) {
DCHECK(CalledOnValidThread());
return core_->Read(buffer, buffer_size, callback);
}
int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
const net::CompletionCallback& callback) {
DCHECK(CalledOnValidThread());
return core_->Write(buffer, buffer_size, callback);
}
bool PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
DCHECK(CalledOnValidThread());
core_->SetReceiveBufferSize(size);
return false;
}
bool PseudoTcpAdapter::SetSendBufferSize(int32 size) {
DCHECK(CalledOnValidThread());
core_->SetSendBufferSize(size);
return false;
}
int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
DCHECK(CalledOnValidThread());
// net::StreamSocket requires that Connect return OK if already connected.
if (IsConnected())
return net::OK;
return core_->Connect(callback);
}
void PseudoTcpAdapter::Disconnect() {
DCHECK(CalledOnValidThread());
core_->Disconnect();
}
bool PseudoTcpAdapter::IsConnected() const {
return core_->IsConnected();
}
bool PseudoTcpAdapter::IsConnectedAndIdle() const {
DCHECK(CalledOnValidThread());
NOTIMPLEMENTED();
return false;
}
int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
DCHECK(CalledOnValidThread());
// We don't have a meaningful peer address, but we can't return an
// error, so we return a INADDR_ANY instead.
net::IPAddressNumber ip_address(net::kIPv4AddressSize);
*address = net::IPEndPoint(ip_address, 0);
return net::OK;
}
int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
DCHECK(CalledOnValidThread());
NOTIMPLEMENTED();
return net::ERR_FAILED;
}
const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
DCHECK(CalledOnValidThread());
return net_log_;
}
void PseudoTcpAdapter::SetSubresourceSpeculation() {
DCHECK(CalledOnValidThread());
NOTIMPLEMENTED();
}
void PseudoTcpAdapter::SetOmniboxSpeculation() {
DCHECK(CalledOnValidThread());
NOTIMPLEMENTED();
}
bool PseudoTcpAdapter::WasEverUsed() const {
DCHECK(CalledOnValidThread());
NOTIMPLEMENTED();
return true;
}
bool PseudoTcpAdapter::UsingTCPFastOpen() const {
DCHECK(CalledOnValidThread());
return false;
}
bool PseudoTcpAdapter::WasNpnNegotiated() const {
DCHECK(CalledOnValidThread());
return false;
}
net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
DCHECK(CalledOnValidThread());
return net::kProtoUnknown;
}
bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
DCHECK(CalledOnValidThread());
return false;
}
void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
DCHECK(CalledOnValidThread());
core_->SetAckDelay(delay_ms);
}
void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
DCHECK(CalledOnValidThread());
core_->SetNoDelay(no_delay);
}
void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
DCHECK(CalledOnValidThread());
core_->SetWriteWaitsForSend(write_waits_for_send);
}
} // namespace jingle_glue