普通文本  |  201行  |  5.94 KB

// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "jingle/glue/channel_socket_adapter.h"

#include <limits>

#include "base/callback.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"

namespace jingle_glue {

TransportChannelSocketAdapter::TransportChannelSocketAdapter(
    cricket::TransportChannel* channel)
    : message_loop_(base::MessageLoop::current()),
      channel_(channel),
      closed_error_code_(net::OK) {
  DCHECK(channel_);

  channel_->SignalReadPacket.connect(
      this, &TransportChannelSocketAdapter::OnNewPacket);
  channel_->SignalWritableState.connect(
      this, &TransportChannelSocketAdapter::OnWritableState);
  channel_->SignalDestroyed.connect(
      this, &TransportChannelSocketAdapter::OnChannelDestroyed);
}

TransportChannelSocketAdapter::~TransportChannelSocketAdapter() {
  if (!destruction_callback_.is_null())
    destruction_callback_.Run();
}

void TransportChannelSocketAdapter::SetOnDestroyedCallback(
    const base::Closure& callback) {
  destruction_callback_ = callback;
}

int TransportChannelSocketAdapter::Read(
    net::IOBuffer* buf,
    int buffer_size,
    const net::CompletionCallback& callback) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  DCHECK(buf);
  DCHECK(!callback.is_null());
  CHECK(read_callback_.is_null());

  if (!channel_) {
    DCHECK(closed_error_code_ != net::OK);
    return closed_error_code_;
  }

  read_callback_ = callback;
  read_buffer_ = buf;
  read_buffer_size_ = buffer_size;

  return net::ERR_IO_PENDING;
}

int TransportChannelSocketAdapter::Write(
    net::IOBuffer* buffer,
    int buffer_size,
    const net::CompletionCallback& callback) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  DCHECK(buffer);
  DCHECK(!callback.is_null());
  CHECK(write_callback_.is_null());

  if (!channel_) {
    DCHECK(closed_error_code_ != net::OK);
    return closed_error_code_;
  }

  int result;
  talk_base::PacketOptions options;
  if (channel_->writable()) {
    result = channel_->SendPacket(buffer->data(), buffer_size, options);
    if (result < 0) {
      result = net::MapSystemError(channel_->GetError());

      // If the underlying socket returns IO pending where it shouldn't we
      // pretend the packet is dropped and return as succeeded because no
      // writeable callback will happen.
      if (result == net::ERR_IO_PENDING)
        result = net::OK;
    }
  } else {
    // Channel is not writable yet.
    result = net::ERR_IO_PENDING;
    write_callback_ = callback;
    write_buffer_ = buffer;
    write_buffer_size_ = buffer_size;
  }

  return result;
}

int TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  return (channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0) ?
      net::OK : net::ERR_SOCKET_SET_RECEIVE_BUFFER_SIZE_ERROR;
}

int TransportChannelSocketAdapter::SetSendBufferSize(int32 size) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  return (channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0) ?
      net::OK : net::ERR_SOCKET_SET_SEND_BUFFER_SIZE_ERROR;
}

void TransportChannelSocketAdapter::Close(int error_code) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);

  if (!channel_)  // Already closed.
    return;

  DCHECK(error_code != net::OK);
  closed_error_code_ = error_code;
  channel_->SignalReadPacket.disconnect(this);
  channel_->SignalDestroyed.disconnect(this);
  channel_ = NULL;

  if (!read_callback_.is_null()) {
    net::CompletionCallback callback = read_callback_;
    read_callback_.Reset();
    read_buffer_ = NULL;
    callback.Run(error_code);
  }

  if (!write_callback_.is_null()) {
    net::CompletionCallback callback = write_callback_;
    write_callback_.Reset();
    write_buffer_ = NULL;
    callback.Run(error_code);
  }
}

void TransportChannelSocketAdapter::OnNewPacket(
    cricket::TransportChannel* channel,
    const char* data,
    size_t data_size,
    const talk_base::PacketTime& packet_time,
    int flags) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  DCHECK_EQ(channel, channel_);
  if (!read_callback_.is_null()) {
    DCHECK(read_buffer_.get());
    CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max()));

    if (read_buffer_size_ < static_cast<int>(data_size)) {
      LOG(WARNING) << "Data buffer is smaller than the received packet. "
                   << "Dropping the data that doesn't fit.";
      data_size = read_buffer_size_;
    }

    memcpy(read_buffer_->data(), data, data_size);

    net::CompletionCallback callback = read_callback_;
    read_callback_.Reset();
    read_buffer_ = NULL;

    callback.Run(data_size);
  } else {
    LOG(WARNING)
        << "Data was received without a callback. Dropping the packet.";
  }
}

void TransportChannelSocketAdapter::OnWritableState(
    cricket::TransportChannel* channel) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  // Try to send the packet if there is a pending write.
  if (!write_callback_.is_null()) {
    talk_base::PacketOptions options;
    int result = channel_->SendPacket(write_buffer_->data(),
                                      write_buffer_size_,
                                      options);
    if (result < 0)
      result = net::MapSystemError(channel_->GetError());

    if (result != net::ERR_IO_PENDING) {
      net::CompletionCallback callback = write_callback_;
      write_callback_.Reset();
      write_buffer_ = NULL;
      callback.Run(result);
    }
  }
}

void TransportChannelSocketAdapter::OnChannelDestroyed(
    cricket::TransportChannel* channel) {
  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
  DCHECK_EQ(channel, channel_);
  Close(net::ERR_CONNECTION_ABORTED);
}

}  // namespace jingle_glue