普通文本  |  485行  |  15.44 KB

// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <algorithm>
#include <limits>

#include "net/websockets/websocket.h"

#include "base/message_loop.h"
#include "net/base/host_resolver.h"
#include "net/websockets/websocket_handshake.h"
#include "net/websockets/websocket_handshake_draft75.h"

namespace net {

static const char kClosingFrame[2] = {'\xff', '\x00'};
static int64 kClosingHandshakeTimeout = 1000;  // msec.

WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate)
    : ready_state_(INITIALIZED),
      request_(request),
      handshake_(NULL),
      delegate_(delegate),
      origin_loop_(MessageLoop::current()),
      socket_stream_(NULL),
      max_pending_send_allowed_(0),
      current_read_buf_(NULL),
      read_consumed_len_(0),
      current_write_buf_(NULL),
      server_closing_handshake_(false),
      client_closing_handshake_(false),
      closing_handshake_started_(false),
      force_close_task_(NULL),
      closing_handshake_timeout_(kClosingHandshakeTimeout) {
  DCHECK(request_.get());
  DCHECK(delegate_);
  DCHECK(origin_loop_);
}

WebSocket::~WebSocket() {
  DCHECK(ready_state_ == INITIALIZED || !delegate_);
  DCHECK(!socket_stream_);
  DCHECK(!delegate_);
}

void WebSocket::Connect() {
  DCHECK(ready_state_ == INITIALIZED);
  DCHECK(request_.get());
  DCHECK(delegate_);
  DCHECK(!socket_stream_);
  DCHECK(MessageLoop::current() == origin_loop_);

  socket_stream_ = new SocketStream(request_->url(), this);
  socket_stream_->set_context(request_->context());

  if (request_->host_resolver())
    socket_stream_->SetHostResolver(request_->host_resolver());
  if (request_->client_socket_factory())
    socket_stream_->SetClientSocketFactory(request_->client_socket_factory());

  AddRef();  // Release in DoClose().
  ready_state_ = CONNECTING;
  socket_stream_->Connect();
}

void WebSocket::Send(const std::string& msg) {
  if (ready_state_ == CLOSING || ready_state_ == CLOSED) {
    return;
  }
  if (client_closing_handshake_) {
    // We must not send any data after we start the WebSocket closing handshake.
    return;
  }
  DCHECK(ready_state_ == OPEN);
  DCHECK(MessageLoop::current() == origin_loop_);

  IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2);
  char* p = buf->data();
  *p = '\0';
  memcpy(p + 1, msg.data(), msg.size());
  *(p + 1 + msg.size()) = '\xff';
  pending_write_bufs_.push_back(make_scoped_refptr(buf));
  SendPending();
}

void WebSocket::Close() {
  DCHECK(MessageLoop::current() == origin_loop_);

  // If connection has not yet started, do nothing.
  if (ready_state_ == INITIALIZED) {
    DCHECK(!socket_stream_);
    ready_state_ = CLOSED;
    return;
  }

  // If the readyState attribute is in the CLOSING or CLOSED state, do nothing
  if (ready_state_ == CLOSING || ready_state_ == CLOSED)
    return;

  if (request_->version() == DRAFT75) {
    DCHECK(socket_stream_);
    socket_stream_->Close();
    return;
  }

  // If the WebSocket connection is not yet established, fail the WebSocket
  // connection and set the readyState attribute's value to CLOSING.
  if (ready_state_ == CONNECTING) {
    ready_state_ = CLOSING;
    origin_loop_->PostTask(
        FROM_HERE,
        NewRunnableMethod(this, &WebSocket::FailConnection));
  }

  // If the WebSocket closing handshake has not yet been started, start
  // the WebSocket closing handshake and set the readyState attribute's value
  // to CLOSING.
  if (!closing_handshake_started_) {
    ready_state_ = CLOSING;
    origin_loop_->PostTask(
        FROM_HERE,
        NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
  }

  // Otherwise, set the readyState attribute's value to CLOSING.
  ready_state_ = CLOSING;
}

void WebSocket::DetachDelegate() {
  if (!delegate_)
    return;
  delegate_ = NULL;
  if (ready_state_ == INITIALIZED) {
    DCHECK(!socket_stream_);
    ready_state_ = CLOSED;
    return;
  }
  if (ready_state_ != CLOSED) {
    DCHECK(socket_stream_);
    socket_stream_->Close();
  }
}

void WebSocket::OnConnected(SocketStream* socket_stream,
                            int max_pending_send_allowed) {
  DCHECK(socket_stream == socket_stream_);
  max_pending_send_allowed_ = max_pending_send_allowed;

  // Use |max_pending_send_allowed| as hint for initial size of read buffer.
  current_read_buf_ = new GrowableIOBuffer();
  current_read_buf_->SetCapacity(max_pending_send_allowed_);
  read_consumed_len_ = 0;

  DCHECK(!current_write_buf_);
  DCHECK(!handshake_.get());
  switch (request_->version()) {
    case DEFAULT_VERSION:
      handshake_.reset(new WebSocketHandshake(
          request_->url(), request_->origin(), request_->location(),
          request_->protocol()));
      break;
    case DRAFT75:
      handshake_.reset(new WebSocketHandshakeDraft75(
          request_->url(), request_->origin(), request_->location(),
          request_->protocol()));
      break;
    default:
      NOTREACHED() << "Unexpected protocol version:" << request_->version();
  }

  const std::string msg = handshake_->CreateClientHandshakeMessage();
  IOBufferWithSize* buf = new IOBufferWithSize(msg.size());
  memcpy(buf->data(), msg.data(), msg.size());
  pending_write_bufs_.push_back(make_scoped_refptr(buf));
  origin_loop_->PostTask(FROM_HERE,
                         NewRunnableMethod(this, &WebSocket::SendPending));
}

void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) {
  DCHECK(socket_stream == socket_stream_);
  DCHECK(current_write_buf_);
  current_write_buf_->DidConsume(amount_sent);
  DCHECK_GE(current_write_buf_->BytesRemaining(), 0);
  if (current_write_buf_->BytesRemaining() == 0) {
    current_write_buf_ = NULL;
    pending_write_bufs_.pop_front();
  }
  origin_loop_->PostTask(FROM_HERE,
                         NewRunnableMethod(this, &WebSocket::SendPending));
}

void WebSocket::OnReceivedData(SocketStream* socket_stream,
                               const char* data, int len) {
  DCHECK(socket_stream == socket_stream_);
  AddToReadBuffer(data, len);
  origin_loop_->PostTask(FROM_HERE,
                         NewRunnableMethod(this, &WebSocket::DoReceivedData));
}

void WebSocket::OnClose(SocketStream* socket_stream) {
  origin_loop_->PostTask(FROM_HERE,
                         NewRunnableMethod(this, &WebSocket::DoClose));
}

void WebSocket::OnError(const SocketStream* socket_stream, int error) {
  origin_loop_->PostTask(
      FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error));
}

void WebSocket::SendPending() {
  DCHECK(MessageLoop::current() == origin_loop_);
  if (!socket_stream_) {
    DCHECK_EQ(CLOSED, ready_state_);
    return;
  }
  if (!current_write_buf_) {
    if (pending_write_bufs_.empty()) {
      if (client_closing_handshake_) {
        // Already sent 0xFF and 0x00 bytes.
        // *The WebSocket closing handshake has started.*
        closing_handshake_started_ = true;
        if (server_closing_handshake_) {
          // 4.2 3-8-3 If the WebSocket connection is not already closed,
          // then close the WebSocket connection.
          // *The WebSocket closing handshake has finished*
          socket_stream_->Close();
        } else {
          // 5. Wait a user-agent-determined length of time, or until the
          // WebSocket connection is closed.
          force_close_task_ =
              NewRunnableMethod(this, &WebSocket::DoForceCloseConnection);
          origin_loop_->PostDelayedTask(
              FROM_HERE, force_close_task_, closing_handshake_timeout_);
        }
      }
      return;
    }
    current_write_buf_ = new DrainableIOBuffer(
        pending_write_bufs_.front(), pending_write_bufs_.front()->size());
  }
  DCHECK_GT(current_write_buf_->BytesRemaining(), 0);
  bool sent = socket_stream_->SendData(
      current_write_buf_->data(),
      std::min(current_write_buf_->BytesRemaining(),
               max_pending_send_allowed_));
  DCHECK(sent);
}

void WebSocket::DoReceivedData() {
  DCHECK(MessageLoop::current() == origin_loop_);
  scoped_refptr<WebSocket> protect(this);
  switch (ready_state_) {
    case CONNECTING:
      {
        DCHECK(handshake_.get());
        DCHECK(current_read_buf_);
        const char* data =
            current_read_buf_->StartOfBuffer() + read_consumed_len_;
        size_t len = current_read_buf_->offset() - read_consumed_len_;
        int eoh = handshake_->ReadServerHandshake(data, len);
        if (eoh < 0) {
          // Not enough data,  Retry when more data is available.
          return;
        }
        SkipReadBuffer(eoh);
      }
      if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) {
        // Handshake failed.
        socket_stream_->Close();
        return;
      }
      ready_state_ = OPEN;
      if (delegate_)
        delegate_->OnOpen(this);
      if (current_read_buf_->offset() == read_consumed_len_) {
        // No remaining data after handshake message.
        break;
      }
      // FALL THROUGH
    case OPEN:
    case CLOSING:  // need to process closing-frame from server.
      ProcessFrameData();
      break;

    case CLOSED:
      // Closed just after DoReceivedData is queued on |origin_loop_|.
      break;
    default:
      NOTREACHED();
      break;
  }
}

void WebSocket::ProcessFrameData() {
  DCHECK(current_read_buf_);
  if (server_closing_handshake_) {
    // Any data on the connection after the 0xFF frame is discarded.
    return;
  }
  scoped_refptr<WebSocket> protect(this);
  const char* start_frame =
      current_read_buf_->StartOfBuffer() + read_consumed_len_;
  const char* next_frame = start_frame;
  const char* p = next_frame;
  const char* end =
      current_read_buf_->StartOfBuffer() + current_read_buf_->offset();
  while (p < end) {
    // Let /error/ be false.
    bool error = false;

    // Handle the /frame type/ byte as follows.
    unsigned char frame_byte = static_cast<unsigned char>(*p++);
    if ((frame_byte & 0x80) == 0x80) {
      int length = 0;
      while (p < end) {
        if (length > std::numeric_limits<int>::max() / 128) {
          // frame length overflow.
          socket_stream_->Close();
          return;
        }
        unsigned char c = static_cast<unsigned char>(*p);
        length = length * 128 + (c & 0x7f);
        ++p;
        if ((c & 0x80) != 0x80)
          break;
      }
      // Checks if the frame body hasn't been completely received yet.
      // It also checks the case the frame length bytes haven't been completely
      // received yet, because p == end and length > 0 in such case.
      if (p + length < end) {
        p += length;
        next_frame = p;
        if (request_->version() != DRAFT75 &&
            frame_byte == 0xFF && length == 0) {
          // 4.2 Data framing 3. Handle the /frame type/ byte.
          // 8. If the /frame type/ is 0xFF and the /length/ was 0, then
          // run the following substeps:
          // 1. If the WebSocket closing handshake has not yet started, then
          // start the WebSocket closing handshake.
          server_closing_handshake_ = true;
          if (!closing_handshake_started_) {
            origin_loop_->PostTask(
                FROM_HERE,
                NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
          } else {
            // If the WebSocket closing handshake has been started and
            // the WebSocket connection is not already closed, then close
            // the WebSocket connection.
            socket_stream_->Close();
          }
          return;
        }
        // 4.2 3-8 Otherwise, let /error/ be true.
        error = true;
      } else {
        // Not enough data in buffer.
        break;
      }
    } else {
      const char* msg_start = p;
      while (p < end && *p != '\xff')
        ++p;
      if (p < end && *p == '\xff') {
        if (frame_byte == 0x00) {
          if (delegate_) {
            delegate_->OnMessage(this, std::string(msg_start, p - msg_start));
          }
        } else {
          // Otherwise, discard the data and let /error/ to be true.
          error = true;
        }
        ++p;
        next_frame = p;
      }
    }
    // If /error/ is true, then *a WebSocket error has been detected.*
    if (error && delegate_)
      delegate_->OnError(this);
  }
  SkipReadBuffer(next_frame - start_frame);
}

void WebSocket::AddToReadBuffer(const char* data, int len) {
  DCHECK(current_read_buf_);
  // Check if |current_read_buf_| has enough space to store |len| of |data|.
  if (len >= current_read_buf_->RemainingCapacity()) {
    current_read_buf_->SetCapacity(
        current_read_buf_->offset() + len);
  }

  DCHECK(current_read_buf_->RemainingCapacity() >= len);
  memcpy(current_read_buf_->data(), data, len);
  current_read_buf_->set_offset(current_read_buf_->offset() + len);
}

void WebSocket::SkipReadBuffer(int len) {
  if (len == 0)
    return;
  DCHECK_GT(len, 0);
  read_consumed_len_ += len;
  int remaining = current_read_buf_->offset() - read_consumed_len_;
  DCHECK_GE(remaining, 0);
  if (remaining < read_consumed_len_ &&
      current_read_buf_->RemainingCapacity() < read_consumed_len_) {
    // Pre compaction:
    // 0             v-read_consumed_len_  v-offset               v- capacity
    // |..processed..| .. remaining ..     | .. RemainingCapacity |
    //
    memmove(current_read_buf_->StartOfBuffer(),
            current_read_buf_->StartOfBuffer() + read_consumed_len_,
            remaining);
    read_consumed_len_ = 0;
    current_read_buf_->set_offset(remaining);
    // Post compaction:
    // 0read_consumed_len_  v- offset                             v- capacity
    // |.. remaining ..     | ..  RemainingCapacity  ...          |
    //
  }
}

void WebSocket::StartClosingHandshake() {
  // 4.2 *start the WebSocket closing handshake*.
  if (closing_handshake_started_ || client_closing_handshake_) {
    // 1. If the WebSocket closing handshake has started, then abort these
    // steps.
    return;
  }
  // 2.,3. Send a 0xFF and 0x00 byte to the server.
  client_closing_handshake_ = true;
  IOBufferWithSize* buf = new IOBufferWithSize(2);
  memcpy(buf->data(), kClosingFrame, 2);
  pending_write_bufs_.push_back(make_scoped_refptr(buf));
  SendPending();
}

void WebSocket::DoForceCloseConnection() {
  // 4.2 *start the WebSocket closing handshake*
  // 6. If the WebSocket connection is not already closed, then close the
  // WebSocket connection.  (If this happens, then the closing handshake
  // doesn't finish.)
  DCHECK(MessageLoop::current() == origin_loop_);
  force_close_task_ = NULL;
  FailConnection();
}

void WebSocket::FailConnection() {
  DCHECK(MessageLoop::current() == origin_loop_);
  // 6.1 Client-initiated closure.
  // *fail the WebSocket connection*.
  // the user agent must close the WebSocket connection, and may report the
  // problem to the user.
  if (!socket_stream_)
    return;
  socket_stream_->Close();
}

void WebSocket::DoClose() {
  DCHECK(MessageLoop::current() == origin_loop_);
  if (force_close_task_) {
    // WebSocket connection is closed while waiting a user-agent-determined
    // length of time after *The WebSocket closing handshake has started*.
    force_close_task_->Cancel();
    force_close_task_ = NULL;
  }
  WebSocketDelegate* delegate = delegate_;
  delegate_ = NULL;
  ready_state_ = CLOSED;
  if (!socket_stream_)
    return;
  socket_stream_ = NULL;
  if (delegate)
    delegate->OnClose(this,
                      server_closing_handshake_ && closing_handshake_started_);
  Release();
}

void WebSocket::DoSocketError(int error) {
  DCHECK(MessageLoop::current() == origin_loop_);
  if (delegate_)
    delegate_->OnSocketError(this, error);
}

}  // namespace net