普通文本  |  1117行  |  39.68 KB

// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/websockets/websocket_channel.h"

#include <limits.h>  // for INT_MAX

#include <algorithm>
#include <deque>

#include "base/basictypes.h"  // for size_t
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/metrics/histogram.h"
#include "base/numerics/safe_conversions.h"
#include "base/stl_util.h"
#include "base/strings/stringprintf.h"
#include "base/time/time.h"
#include "net/base/io_buffer.h"
#include "net/base/net_log.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_util.h"
#include "net/websockets/websocket_errors.h"
#include "net/websockets/websocket_event_interface.h"
#include "net/websockets/websocket_frame.h"
#include "net/websockets/websocket_handshake_request_info.h"
#include "net/websockets/websocket_handshake_response_info.h"
#include "net/websockets/websocket_mux.h"
#include "net/websockets/websocket_stream.h"
#include "url/origin.h"

namespace net {

namespace {

using base::StreamingUtf8Validator;

const int kDefaultSendQuotaLowWaterMark = 1 << 16;
const int kDefaultSendQuotaHighWaterMark = 1 << 17;
const size_t kWebSocketCloseCodeLength = 2;
// This timeout is based on TCPMaximumSegmentLifetime * 2 from
// MainThreadWebSocketChannel.cpp in Blink.
const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;

typedef WebSocketEventInterface::ChannelState ChannelState;
const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;

// Maximum close reason length = max control frame payload -
//                               status code length
//                             = 125 - 2
const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;

// Check a close status code for strict compliance with RFC6455. This is only
// used for close codes received from a renderer that we are intending to send
// out over the network. See ParseClose() for the restrictions on incoming close
// codes. The |code| parameter is type int for convenience of implementation;
// the real type is uint16. Code 1005 is treated specially; it cannot be set
// explicitly by Javascript but the renderer uses it to indicate we should send
// a Close frame with no payload.
bool IsStrictlyValidCloseStatusCode(int code) {
  static const int kInvalidRanges[] = {
      // [BAD, OK)
      0,    1000,   // 1000 is the first valid code
      1006, 1007,   // 1006 MUST NOT be set.
      1014, 3000,   // 1014 unassigned; 1015 up to 2999 are reserved.
      5000, 65536,  // Codes above 5000 are invalid.
  };
  const int* const kInvalidRangesEnd =
      kInvalidRanges + arraysize(kInvalidRanges);

  DCHECK_GE(code, 0);
  DCHECK_LT(code, 65536);
  const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
  DCHECK_NE(kInvalidRangesEnd, upper);
  DCHECK_GT(upper, kInvalidRanges);
  DCHECK_GT(*upper, code);
  DCHECK_LE(*(upper - 1), code);
  return ((upper - kInvalidRanges) % 2) == 0;
}

// This function avoids a bunch of boilerplate code.
void AllowUnused(ChannelState ALLOW_UNUSED unused) {}

// Sets |name| to the name of the frame type for the given |opcode|. Note that
// for all of Text, Binary and Continuation opcode, this method returns
// "Data frame".
void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
                           std::string* name) {
  switch (opcode) {
    case WebSocketFrameHeader::kOpCodeText:    // fall-thru
    case WebSocketFrameHeader::kOpCodeBinary:  // fall-thru
    case WebSocketFrameHeader::kOpCodeContinuation:
      *name = "Data frame";
      break;

    case WebSocketFrameHeader::kOpCodePing:
      *name = "Ping";
      break;

    case WebSocketFrameHeader::kOpCodePong:
      *name = "Pong";
      break;

    case WebSocketFrameHeader::kOpCodeClose:
      *name = "Close";
      break;

    default:
      *name = "Unknown frame type";
      break;
  }

  return;
}

}  // namespace

// A class to encapsulate a set of frames and information about the size of
// those frames.
class WebSocketChannel::SendBuffer {
 public:
  SendBuffer() : total_bytes_(0) {}

  // Add a WebSocketFrame to the buffer and increase total_bytes_.
  void AddFrame(scoped_ptr<WebSocketFrame> chunk);

  // Return a pointer to the frames_ for write purposes.
  ScopedVector<WebSocketFrame>* frames() { return &frames_; }

 private:
  // The frames_ that will be sent in the next call to WriteFrames().
  ScopedVector<WebSocketFrame> frames_;

  // The total size of the payload data in |frames_|. This will be used to
  // measure the throughput of the link.
  // TODO(ricea): Measure the throughput of the link.
  size_t total_bytes_;
};

void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
  total_bytes_ += frame->header.payload_length;
  frames_.push_back(frame.release());
}

// Implementation of WebSocketStream::ConnectDelegate that simply forwards the
// calls on to the WebSocketChannel that created it.
class WebSocketChannel::ConnectDelegate
    : public WebSocketStream::ConnectDelegate {
 public:
  explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}

  virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
    creator_->OnConnectSuccess(stream.Pass());
    // |this| may have been deleted.
  }

  virtual void OnFailure(const std::string& message) OVERRIDE {
    creator_->OnConnectFailure(message);
    // |this| has been deleted.
  }

  virtual void OnStartOpeningHandshake(
      scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
    creator_->OnStartOpeningHandshake(request.Pass());
  }

  virtual void OnFinishOpeningHandshake(
      scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
    creator_->OnFinishOpeningHandshake(response.Pass());
  }

  virtual void OnSSLCertificateError(
      scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks>
          ssl_error_callbacks,
      const SSLInfo& ssl_info,
      bool fatal) OVERRIDE {
    creator_->OnSSLCertificateError(
        ssl_error_callbacks.Pass(), ssl_info, fatal);
  }

 private:
  // A pointer to the WebSocketChannel that created this object. There is no
  // danger of this pointer being stale, because deleting the WebSocketChannel
  // cancels the connect process, deleting this object and preventing its
  // callbacks from being called.
  WebSocketChannel* const creator_;

  DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
};

class WebSocketChannel::HandshakeNotificationSender
    : public base::SupportsWeakPtr<HandshakeNotificationSender> {
 public:
  explicit HandshakeNotificationSender(WebSocketChannel* channel);
  ~HandshakeNotificationSender();

  static void Send(base::WeakPtr<HandshakeNotificationSender> sender);

  ChannelState SendImmediately(WebSocketEventInterface* event_interface);

  const WebSocketHandshakeRequestInfo* handshake_request_info() const {
    return handshake_request_info_.get();
  }

  void set_handshake_request_info(
      scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
    handshake_request_info_ = request_info.Pass();
  }

  const WebSocketHandshakeResponseInfo* handshake_response_info() const {
    return handshake_response_info_.get();
  }

  void set_handshake_response_info(
      scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
    handshake_response_info_ = response_info.Pass();
  }

 private:
  WebSocketChannel* owner_;
  scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
  scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
};

WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
    WebSocketChannel* channel)
    : owner_(channel) {}

WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}

void WebSocketChannel::HandshakeNotificationSender::Send(
    base::WeakPtr<HandshakeNotificationSender> sender) {
  // Do nothing if |sender| is already destructed.
  if (sender) {
    WebSocketChannel* channel = sender->owner_;
    AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
  }
}

ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
    WebSocketEventInterface* event_interface) {

  if (handshake_request_info_.get()) {
    if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
                               handshake_request_info_.Pass()))
      return CHANNEL_DELETED;
  }

  if (handshake_response_info_.get()) {
    if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
                               handshake_response_info_.Pass()))
      return CHANNEL_DELETED;

    // TODO(yhirano): We can release |this| to save memory because
    // there will be no more opening handshake notification.
  }

  return CHANNEL_ALIVE;
}

WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
    bool final,
    WebSocketFrameHeader::OpCode opcode,
    const scoped_refptr<IOBuffer>& data,
    size_t offset,
    size_t size)
    : final_(final),
      opcode_(opcode),
      data_(data),
      offset_(offset),
      size_(size) {}

WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}

void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
  DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
  opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
}

void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
  DCHECK_LE(offset_, size_);
  DCHECK_LE(bytes, size_ - offset_);
  offset_ += bytes;
}

WebSocketChannel::WebSocketChannel(
    scoped_ptr<WebSocketEventInterface> event_interface,
    URLRequestContext* url_request_context)
    : event_interface_(event_interface.Pass()),
      url_request_context_(url_request_context),
      send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
      send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
      current_send_quota_(0),
      current_receive_quota_(0),
      timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
      received_close_code_(0),
      state_(FRESHLY_CONSTRUCTED),
      notification_sender_(new HandshakeNotificationSender(this)),
      sending_text_message_(false),
      receiving_text_message_(false),
      expecting_to_handle_continuation_(false),
      initial_frame_forwarded_(false) {}

WebSocketChannel::~WebSocketChannel() {
  // The stream may hold a pointer to read_frames_, and so it needs to be
  // destroyed first.
  stream_.reset();
  // The timer may have a callback pointing back to us, so stop it just in case
  // someone decides to run the event loop from their destructor.
  timer_.Stop();
}

void WebSocketChannel::SendAddChannelRequest(
    const GURL& socket_url,
    const std::vector<std::string>& requested_subprotocols,
    const url::Origin& origin) {
  // Delegate to the tested version.
  SendAddChannelRequestWithSuppliedCreator(
      socket_url,
      requested_subprotocols,
      origin,
      base::Bind(&WebSocketStream::CreateAndConnectStream));
}

void WebSocketChannel::SetState(State new_state) {
  DCHECK_NE(state_, new_state);

  if (new_state == CONNECTED)
    established_on_ = base::TimeTicks::Now();
  if (state_ == CONNECTED && !established_on_.is_null()) {
    UMA_HISTOGRAM_LONG_TIMES(
        "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_);
  }

  state_ = new_state;
}

bool WebSocketChannel::InClosingState() const {
  // The state RECV_CLOSED is not supported here, because it is only used in one
  // code path and should not leak into the code in general.
  DCHECK_NE(RECV_CLOSED, state_)
      << "InClosingState called with state_ == RECV_CLOSED";
  return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
}

void WebSocketChannel::SendFrame(bool fin,
                                 WebSocketFrameHeader::OpCode op_code,
                                 const std::vector<char>& data) {
  if (data.size() > INT_MAX) {
    NOTREACHED() << "Frame size sanity check failed";
    return;
  }
  if (stream_ == NULL) {
    LOG(DFATAL) << "Got SendFrame without a connection established; "
                << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
                << " data.size()=" << data.size();
    return;
  }
  if (InClosingState()) {
    DVLOG(1) << "SendFrame called in state " << state_
             << ". This may be a bug, or a harmless race.";
    return;
  }
  if (state_ != CONNECTED) {
    NOTREACHED() << "SendFrame() called in state " << state_;
    return;
  }
  if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
    // TODO(ricea): Kill renderer.
    AllowUnused(
        FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
    // |this| has been deleted.
    return;
  }
  if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
    LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
                << "; misbehaving renderer? fin=" << fin
                << " data.size()=" << data.size();
    return;
  }
  if (op_code == WebSocketFrameHeader::kOpCodeText ||
      (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
       sending_text_message_)) {
    StreamingUtf8Validator::State state =
        outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
    if (state == StreamingUtf8Validator::INVALID ||
        (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
      // TODO(ricea): Kill renderer.
      AllowUnused(
          FailChannel("Browser sent a text frame containing invalid UTF-8",
                      kWebSocketErrorGoingAway,
                      ""));
      // |this| has been deleted.
      return;
    }
    sending_text_message_ = !fin;
    DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
  }
  current_send_quota_ -= data.size();
  // TODO(ricea): If current_send_quota_ has dropped below
  // send_quota_low_water_mark_, it might be good to increase the "low
  // water mark" and "high water mark", but only if the link to the WebSocket
  // server is not saturated.
  scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
  std::copy(data.begin(), data.end(), buffer->data());
  AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
  // |this| may have been deleted.
}

void WebSocketChannel::SendFlowControl(int64 quota) {
  DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
         state_ == CLOSE_WAIT);
  // TODO(ricea): Kill the renderer if it tries to send us a negative quota
  // value or > INT_MAX.
  DCHECK_GE(quota, 0);
  DCHECK_LE(quota, INT_MAX);
  if (!pending_received_frames_.empty()) {
    DCHECK_EQ(0, current_receive_quota_);
  }
  while (!pending_received_frames_.empty() && quota > 0) {
    PendingReceivedFrame& front = pending_received_frames_.front();
    const size_t data_size = front.size() - front.offset();
    const size_t bytes_to_send =
        std::min(base::checked_cast<size_t>(quota), data_size);
    const bool final = front.final() && data_size == bytes_to_send;
    const char* data =
        front.data().get() ? front.data()->data() + front.offset() : NULL;
    DCHECK(!bytes_to_send || data) << "Non empty data should not be null.";
    const std::vector<char> data_vector(data, data + bytes_to_send);
    DVLOG(3) << "Sending frame previously split due to quota to the "
             << "renderer: quota=" << quota << " data_size=" << data_size
             << " bytes_to_send=" << bytes_to_send;
    if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
        CHANNEL_DELETED)
      return;
    if (bytes_to_send < data_size) {
      front.DidConsume(bytes_to_send);
      front.ResetOpcode();
      return;
    }
    const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
    DCHECK_GE(quota, signed_bytes_to_send);
    quota -= signed_bytes_to_send;

    pending_received_frames_.pop();
  }
  // If current_receive_quota_ == 0 then there is no pending ReadFrames()
  // operation.
  const bool start_read =
      current_receive_quota_ == 0 && quota > 0 &&
      (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
  current_receive_quota_ += base::checked_cast<int>(quota);
  if (start_read)
    AllowUnused(ReadFrames());
  // |this| may have been deleted.
}

void WebSocketChannel::StartClosingHandshake(uint16 code,
                                             const std::string& reason) {
  if (InClosingState()) {
    // When the associated renderer process is killed while the channel is in
    // CLOSING state we reach here.
    DVLOG(1) << "StartClosingHandshake called in state " << state_
             << ". This may be a bug, or a harmless race.";
    return;
  }
  if (state_ == CONNECTING) {
    // Abort the in-progress handshake and drop the connection immediately.
    stream_request_.reset();
    SetState(CLOSED);
    AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
    return;
  }
  if (state_ != CONNECTED) {
    NOTREACHED() << "StartClosingHandshake() called in state " << state_;
    return;
  }
  // Javascript actually only permits 1000 and 3000-4999, but the implementation
  // itself may produce different codes. The length of |reason| is also checked
  // by Javascript.
  if (!IsStrictlyValidCloseStatusCode(code) ||
      reason.size() > kMaximumCloseReasonLength) {
    // "InternalServerError" is actually used for errors from any endpoint, per
    // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
    // reason it must be malfunctioning in some way, and based on that we
    // interpret this as an internal error.
    if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) {
      DCHECK_EQ(CONNECTED, state_);
      SetState(SEND_CLOSED);
    }
    return;
  }
  if (SendClose(
          code,
          StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
      CHANNEL_DELETED)
    return;
  DCHECK_EQ(CONNECTED, state_);
  SetState(SEND_CLOSED);
}

void WebSocketChannel::SendAddChannelRequestForTesting(
    const GURL& socket_url,
    const std::vector<std::string>& requested_subprotocols,
    const url::Origin& origin,
    const WebSocketStreamCreator& creator) {
  SendAddChannelRequestWithSuppliedCreator(
      socket_url, requested_subprotocols, origin, creator);
}

void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
    base::TimeDelta delay) {
  timeout_ = delay;
}

void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
    const GURL& socket_url,
    const std::vector<std::string>& requested_subprotocols,
    const url::Origin& origin,
    const WebSocketStreamCreator& creator) {
  DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
  if (!socket_url.SchemeIsWSOrWSS()) {
    // TODO(ricea): Kill the renderer (this error should have been caught by
    // Javascript).
    AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
    // |this| is deleted here.
    return;
  }
  socket_url_ = socket_url;
  scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
      new ConnectDelegate(this));
  stream_request_ = creator.Run(socket_url_,
                                requested_subprotocols,
                                origin,
                                url_request_context_,
                                BoundNetLog(),
                                connect_delegate.Pass());
  SetState(CONNECTING);
}

void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
  DCHECK(stream);
  DCHECK_EQ(CONNECTING, state_);

  stream_ = stream.Pass();

  SetState(CONNECTED);

  if (event_interface_->OnAddChannelResponse(
          false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
      CHANNEL_DELETED)
    return;

  // TODO(ricea): Get flow control information from the WebSocketStream once we
  // have a multiplexing WebSocketStream.
  current_send_quota_ = send_quota_high_water_mark_;
  if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
      CHANNEL_DELETED)
    return;

  // |stream_request_| is not used once the connection has succeeded.
  stream_request_.reset();

  AllowUnused(ReadFrames());
  // |this| may have been deleted.
}

void WebSocketChannel::OnConnectFailure(const std::string& message) {
  DCHECK_EQ(CONNECTING, state_);

  // Copy the message before we delete its owner.
  std::string message_copy = message;

  SetState(CLOSED);
  stream_request_.reset();

  if (CHANNEL_DELETED ==
      notification_sender_->SendImmediately(event_interface_.get())) {
    // |this| has been deleted.
    return;
  }
  AllowUnused(event_interface_->OnFailChannel(message_copy));
  // |this| has been deleted.
}

void WebSocketChannel::OnSSLCertificateError(
    scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,
    const SSLInfo& ssl_info,
    bool fatal) {
  AllowUnused(event_interface_->OnSSLCertificateError(
      ssl_error_callbacks.Pass(), socket_url_, ssl_info, fatal));
}

void WebSocketChannel::OnStartOpeningHandshake(
    scoped_ptr<WebSocketHandshakeRequestInfo> request) {
  DCHECK(!notification_sender_->handshake_request_info());

  // Because it is hard to handle an IPC error synchronously is difficult,
  // we asynchronously notify the information.
  notification_sender_->set_handshake_request_info(request.Pass());
  ScheduleOpeningHandshakeNotification();
}

void WebSocketChannel::OnFinishOpeningHandshake(
    scoped_ptr<WebSocketHandshakeResponseInfo> response) {
  DCHECK(!notification_sender_->handshake_response_info());

  // Because it is hard to handle an IPC error synchronously is difficult,
  // we asynchronously notify the information.
  notification_sender_->set_handshake_response_info(response.Pass());
  ScheduleOpeningHandshakeNotification();
}

void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
  base::MessageLoop::current()->PostTask(
      FROM_HERE,
      base::Bind(HandshakeNotificationSender::Send,
                 notification_sender_->AsWeakPtr()));
}

ChannelState WebSocketChannel::WriteFrames() {
  int result = OK;
  do {
    // This use of base::Unretained is safe because this object owns the
    // WebSocketStream and destroying it cancels all callbacks.
    result = stream_->WriteFrames(
        data_being_sent_->frames(),
        base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
                   base::Unretained(this),
                   false));
    if (result != ERR_IO_PENDING) {
      if (OnWriteDone(true, result) == CHANNEL_DELETED)
        return CHANNEL_DELETED;
      // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
      // guaranteed to be the same as before OnWriteDone() call.
    }
  } while (result == OK && data_being_sent_);
  return CHANNEL_ALIVE;
}

ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
  DCHECK_NE(CONNECTING, state_);
  DCHECK_NE(ERR_IO_PENDING, result);
  DCHECK(data_being_sent_);
  switch (result) {
    case OK:
      if (data_to_send_next_) {
        data_being_sent_ = data_to_send_next_.Pass();
        if (!synchronous)
          return WriteFrames();
      } else {
        data_being_sent_.reset();
        if (current_send_quota_ < send_quota_low_water_mark_) {
          // TODO(ricea): Increase low_water_mark and high_water_mark if
          // throughput is high, reduce them if throughput is low.  Low water
          // mark needs to be >= the bandwidth delay product *of the IPC
          // channel*. Because factors like context-switch time, thread wake-up
          // time, and bus speed come into play it is complex and probably needs
          // to be determined empirically.
          DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
          // TODO(ricea): Truncate quota by the quota specified by the remote
          // server, if the protocol in use supports quota.
          int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
          current_send_quota_ += fresh_quota;
          return event_interface_->OnFlowControl(fresh_quota);
        }
      }
      return CHANNEL_ALIVE;

    // If a recoverable error condition existed, it would go here.

    default:
      DCHECK_LT(result, 0)
          << "WriteFrames() should only return OK or ERR_ codes";

      stream_->Close();
      SetState(CLOSED);
      return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
  }
}

ChannelState WebSocketChannel::ReadFrames() {
  int result = OK;
  while (result == OK && current_receive_quota_ > 0) {
    // This use of base::Unretained is safe because this object owns the
    // WebSocketStream, and any pending reads will be cancelled when it is
    // destroyed.
    result = stream_->ReadFrames(
        &read_frames_,
        base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
                   base::Unretained(this),
                   false));
    if (result != ERR_IO_PENDING) {
      if (OnReadDone(true, result) == CHANNEL_DELETED)
        return CHANNEL_DELETED;
    }
    DCHECK_NE(CLOSED, state_);
  }
  return CHANNEL_ALIVE;
}

ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
  DCHECK_NE(CONNECTING, state_);
  DCHECK_NE(ERR_IO_PENDING, result);
  switch (result) {
    case OK:
      // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
      // with no data read, not an empty response.
      DCHECK(!read_frames_.empty())
          << "ReadFrames() returned OK, but nothing was read.";
      for (size_t i = 0; i < read_frames_.size(); ++i) {
        scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
        read_frames_[i] = NULL;
        if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
          return CHANNEL_DELETED;
      }
      read_frames_.clear();
      // There should always be a call to ReadFrames pending.
      // TODO(ricea): Unless we are out of quota.
      DCHECK_NE(CLOSED, state_);
      if (!synchronous)
        return ReadFrames();
      return CHANNEL_ALIVE;

    case ERR_WS_PROTOCOL_ERROR:
      // This could be kWebSocketErrorProtocolError (specifically, non-minimal
      // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
      // extension-specific error.
      return FailChannel("Invalid frame header",
                         kWebSocketErrorProtocolError,
                         "WebSocket Protocol Error");

    default:
      DCHECK_LT(result, 0)
          << "ReadFrames() should only return OK or ERR_ codes";

      stream_->Close();
      SetState(CLOSED);

      uint16 code = kWebSocketErrorAbnormalClosure;
      std::string reason = "";
      bool was_clean = false;
      if (received_close_code_ != 0) {
        code = received_close_code_;
        reason = received_close_reason_;
        was_clean = (result == ERR_CONNECTION_CLOSED);
      }

      return DoDropChannel(was_clean, code, reason);
  }
}

ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
  if (frame->header.masked) {
    // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
    // masked frame."
    return FailChannel(
        "A server must not mask any frames that it sends to the "
        "client.",
        kWebSocketErrorProtocolError,
        "Masked frame from server");
  }
  const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
  DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
         frame->header.final);
  if (frame->header.reserved1 || frame->header.reserved2 ||
      frame->header.reserved3) {
    return FailChannel(base::StringPrintf(
                           "One or more reserved bits are on: reserved1 = %d, "
                           "reserved2 = %d, reserved3 = %d",
                           static_cast<int>(frame->header.reserved1),
                           static_cast<int>(frame->header.reserved2),
                           static_cast<int>(frame->header.reserved3)),
                       kWebSocketErrorProtocolError,
                       "Invalid reserved bit");
  }

  // Respond to the frame appropriately to its type.
  return HandleFrameByState(
      opcode, frame->header.final, frame->data, frame->header.payload_length);
}

ChannelState WebSocketChannel::HandleFrameByState(
    const WebSocketFrameHeader::OpCode opcode,
    bool final,
    const scoped_refptr<IOBuffer>& data_buffer,
    size_t size) {
  DCHECK_NE(RECV_CLOSED, state_)
      << "HandleFrame() does not support being called re-entrantly from within "
         "SendClose()";
  DCHECK_NE(CLOSED, state_);
  if (state_ == CLOSE_WAIT) {
    std::string frame_name;
    GetFrameTypeForOpcode(opcode, &frame_name);

    // FailChannel() won't send another Close frame.
    return FailChannel(
        frame_name + " received after close", kWebSocketErrorProtocolError, "");
  }
  switch (opcode) {
    case WebSocketFrameHeader::kOpCodeText:  // fall-thru
    case WebSocketFrameHeader::kOpCodeBinary:
    case WebSocketFrameHeader::kOpCodeContinuation:
      return HandleDataFrame(opcode, final, data_buffer, size);

    case WebSocketFrameHeader::kOpCodePing:
      DVLOG(1) << "Got Ping of size " << size;
      if (state_ == CONNECTED)
        return SendFrameFromIOBuffer(
            true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
      DVLOG(3) << "Ignored ping in state " << state_;
      return CHANNEL_ALIVE;

    case WebSocketFrameHeader::kOpCodePong:
      DVLOG(1) << "Got Pong of size " << size;
      // There is no need to do anything with pong messages.
      return CHANNEL_ALIVE;

    case WebSocketFrameHeader::kOpCodeClose: {
      // TODO(ricea): If there is a message which is queued for transmission to
      // the renderer, then the renderer should not receive an
      // OnClosingHandshake or OnDropChannel IPC until the queued message has
      // been completedly transmitted.
      uint16 code = kWebSocketNormalClosure;
      std::string reason;
      std::string message;
      if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
        return FailChannel(message, code, reason);
      }
      // TODO(ricea): Find a way to safely log the message from the close
      // message (escape control codes and so on).
      DVLOG(1) << "Got Close with code " << code;
      switch (state_) {
        case CONNECTED:
          SetState(RECV_CLOSED);
          if (SendClose(code, reason) == CHANNEL_DELETED)
            return CHANNEL_DELETED;
          DCHECK_EQ(RECV_CLOSED, state_);
          SetState(CLOSE_WAIT);

          if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
            return CHANNEL_DELETED;
          received_close_code_ = code;
          received_close_reason_ = reason;
          break;

        case SEND_CLOSED:
          SetState(CLOSE_WAIT);
          // From RFC6455 section 7.1.5: "Each endpoint
          // will see the status code sent by the other end as _The WebSocket
          // Connection Close Code_."
          received_close_code_ = code;
          received_close_reason_ = reason;
          break;

        default:
          LOG(DFATAL) << "Got Close in unexpected state " << state_;
          break;
      }
      return CHANNEL_ALIVE;
    }

    default:
      return FailChannel(
          base::StringPrintf("Unrecognized frame opcode: %d", opcode),
          kWebSocketErrorProtocolError,
          "Unknown opcode");
  }
}

ChannelState WebSocketChannel::HandleDataFrame(
    WebSocketFrameHeader::OpCode opcode,
    bool final,
    const scoped_refptr<IOBuffer>& data_buffer,
    size_t size) {
  if (state_ != CONNECTED) {
    DVLOG(3) << "Ignored data packet received in state " << state_;
    return CHANNEL_ALIVE;
  }
  DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
         opcode == WebSocketFrameHeader::kOpCodeText ||
         opcode == WebSocketFrameHeader::kOpCodeBinary);
  const bool got_continuation =
      (opcode == WebSocketFrameHeader::kOpCodeContinuation);
  if (got_continuation != expecting_to_handle_continuation_) {
    const std::string console_log = got_continuation
        ? "Received unexpected continuation frame."
        : "Received start of new message but previous message is unfinished.";
    const std::string reason = got_continuation
        ? "Unexpected continuation"
        : "Previous data frame unfinished";
    return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
  }
  expecting_to_handle_continuation_ = !final;
  WebSocketFrameHeader::OpCode opcode_to_send = opcode;
  if (!initial_frame_forwarded_ &&
      opcode == WebSocketFrameHeader::kOpCodeContinuation) {
    opcode_to_send = receiving_text_message_
                         ? WebSocketFrameHeader::kOpCodeText
                         : WebSocketFrameHeader::kOpCodeBinary;
  }
  if (opcode == WebSocketFrameHeader::kOpCodeText ||
      (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
       receiving_text_message_)) {
    // This call is not redundant when size == 0 because it tells us what
    // the current state is.
    StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
        size ? data_buffer->data() : NULL, size);
    if (state == StreamingUtf8Validator::INVALID ||
        (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
      return FailChannel("Could not decode a text frame as UTF-8.",
                         kWebSocketErrorProtocolError,
                         "Invalid UTF-8 in text frame");
    }
    receiving_text_message_ = !final;
    DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
  }
  if (size == 0U && !final)
    return CHANNEL_ALIVE;

  initial_frame_forwarded_ = !final;
  if (size > base::checked_cast<size_t>(current_receive_quota_) ||
      !pending_received_frames_.empty()) {
    const bool no_quota = (current_receive_quota_ == 0);
    DCHECK(no_quota || pending_received_frames_.empty());
    DVLOG(3) << "Queueing frame to renderer due to quota. quota="
             << current_receive_quota_ << " size=" << size;
    WebSocketFrameHeader::OpCode opcode_to_queue =
        no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
    pending_received_frames_.push(PendingReceivedFrame(
        final, opcode_to_queue, data_buffer, current_receive_quota_, size));
    if (no_quota)
      return CHANNEL_ALIVE;
    size = current_receive_quota_;
    final = false;
  }

  // TODO(ricea): Can this copy be eliminated?
  const char* const data_begin = size ? data_buffer->data() : NULL;
  const char* const data_end = data_begin + size;
  const std::vector<char> data(data_begin, data_end);
  current_receive_quota_ -= size;
  DCHECK_GE(current_receive_quota_, 0);

  // Sends the received frame to the renderer process.
  return event_interface_->OnDataFrame(final, opcode_to_send, data);
}

ChannelState WebSocketChannel::SendFrameFromIOBuffer(
    bool fin,
    WebSocketFrameHeader::OpCode op_code,
    const scoped_refptr<IOBuffer>& buffer,
    size_t size) {
  DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
  DCHECK(stream_);

  scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
  WebSocketFrameHeader& header = frame->header;
  header.final = fin;
  header.masked = true;
  header.payload_length = size;
  frame->data = buffer;

  if (data_being_sent_) {
    // Either the link to the WebSocket server is saturated, or several messages
    // are being sent in a batch.
    // TODO(ricea): Keep some statistics to work out the situation and adjust
    // quota appropriately.
    if (!data_to_send_next_)
      data_to_send_next_.reset(new SendBuffer);
    data_to_send_next_->AddFrame(frame.Pass());
    return CHANNEL_ALIVE;
  }

  data_being_sent_.reset(new SendBuffer);
  data_being_sent_->AddFrame(frame.Pass());
  return WriteFrames();
}

ChannelState WebSocketChannel::FailChannel(const std::string& message,
                                           uint16 code,
                                           const std::string& reason) {
  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
  DCHECK_NE(CONNECTING, state_);
  DCHECK_NE(CLOSED, state_);

  // TODO(ricea): Logging.
  if (state_ == CONNECTED) {
    if (SendClose(code, reason) == CHANNEL_DELETED)
      return CHANNEL_DELETED;
  }

  // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
  // should close the connection itself without waiting for the closing
  // handshake.
  stream_->Close();
  SetState(CLOSED);
  return event_interface_->OnFailChannel(message);
}

ChannelState WebSocketChannel::SendClose(uint16 code,
                                         const std::string& reason) {
  DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
  DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
  scoped_refptr<IOBuffer> body;
  size_t size = 0;
  if (code == kWebSocketErrorNoStatusReceived) {
    // Special case: translate kWebSocketErrorNoStatusReceived into a Close
    // frame with no payload.
    DCHECK(reason.empty());
    body = new IOBuffer(0);
  } else {
    const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
    body = new IOBuffer(payload_length);
    size = payload_length;
    base::WriteBigEndian(body->data(), code);
    COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
                   they_should_both_be_two);
    std::copy(
        reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
  }
  // This use of base::Unretained() is safe because we stop the timer in the
  // destructor.
  timer_.Start(
      FROM_HERE,
      timeout_,
      base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
  if (SendFrameFromIOBuffer(
          true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
      CHANNEL_DELETED)
    return CHANNEL_DELETED;
  return CHANNEL_ALIVE;
}

bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
                                  size_t size,
                                  uint16* code,
                                  std::string* reason,
                                  std::string* message) {
  reason->clear();
  if (size < kWebSocketCloseCodeLength) {
    if (size == 0U) {
      *code = kWebSocketErrorNoStatusReceived;
      return true;
    }

    DVLOG(1) << "Close frame with payload size " << size << " received "
             << "(the first byte is " << std::hex
             << static_cast<int>(buffer->data()[0]) << ")";
    *code = kWebSocketErrorProtocolError;
    *message =
        "Received a broken close frame containing an invalid size body.";
    return false;
  }

  const char* data = buffer->data();
  uint16 unchecked_code = 0;
  base::ReadBigEndian(data, &unchecked_code);
  COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
                 they_should_both_be_two_bytes);

  switch (unchecked_code) {
    case kWebSocketErrorNoStatusReceived:
    case kWebSocketErrorAbnormalClosure:
    case kWebSocketErrorTlsHandshake:
      *code = kWebSocketErrorProtocolError;
      *message =
          "Received a broken close frame containing a reserved status code.";
      return false;

    default:
      *code = unchecked_code;
      break;
  }

  std::string text(data + kWebSocketCloseCodeLength, data + size);
  if (StreamingUtf8Validator::Validate(text)) {
    reason->swap(text);
    return true;
  }

  *code = kWebSocketErrorProtocolError;
  *reason = "Invalid UTF-8 in Close frame";
  *message = "Received a broken close frame containing invalid UTF-8.";
  return false;
}

ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
                                             uint16 code,
                                             const std::string& reason) {
  if (CHANNEL_DELETED ==
      notification_sender_->SendImmediately(event_interface_.get()))
    return CHANNEL_DELETED;
  ChannelState result =
      event_interface_->OnDropChannel(was_clean, code, reason);
  DCHECK_EQ(CHANNEL_DELETED, result);
  return result;
}

void WebSocketChannel::CloseTimeout() {
  stream_->Close();
  SetState(CLOSED);
  AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
  // |this| has been deleted.
}

}  // namespace net