// Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include <algorithm> #include <limits> #include "net/websockets/websocket.h" #include "base/message_loop.h" #include "net/base/host_resolver.h" #include "net/websockets/websocket_handshake.h" #include "net/websockets/websocket_handshake_draft75.h" namespace net { static const char kClosingFrame[2] = {'\xff', '\x00'}; static int64 kClosingHandshakeTimeout = 1000; // msec. WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate) : ready_state_(INITIALIZED), request_(request), handshake_(NULL), delegate_(delegate), origin_loop_(MessageLoop::current()), socket_stream_(NULL), max_pending_send_allowed_(0), current_read_buf_(NULL), read_consumed_len_(0), current_write_buf_(NULL), server_closing_handshake_(false), client_closing_handshake_(false), closing_handshake_started_(false), force_close_task_(NULL), closing_handshake_timeout_(kClosingHandshakeTimeout) { DCHECK(request_.get()); DCHECK(delegate_); DCHECK(origin_loop_); } WebSocket::~WebSocket() { DCHECK(ready_state_ == INITIALIZED || !delegate_); DCHECK(!socket_stream_); DCHECK(!delegate_); } void WebSocket::Connect() { DCHECK(ready_state_ == INITIALIZED); DCHECK(request_.get()); DCHECK(delegate_); DCHECK(!socket_stream_); DCHECK(MessageLoop::current() == origin_loop_); socket_stream_ = new SocketStream(request_->url(), this); socket_stream_->set_context(request_->context()); if (request_->host_resolver()) socket_stream_->SetHostResolver(request_->host_resolver()); if (request_->client_socket_factory()) socket_stream_->SetClientSocketFactory(request_->client_socket_factory()); AddRef(); // Release in DoClose(). ready_state_ = CONNECTING; socket_stream_->Connect(); } void WebSocket::Send(const std::string& msg) { if (ready_state_ == CLOSING || ready_state_ == CLOSED) { return; } if (client_closing_handshake_) { // We must not send any data after we start the WebSocket closing handshake. return; } DCHECK(ready_state_ == OPEN); DCHECK(MessageLoop::current() == origin_loop_); IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2); char* p = buf->data(); *p = '\0'; memcpy(p + 1, msg.data(), msg.size()); *(p + 1 + msg.size()) = '\xff'; pending_write_bufs_.push_back(make_scoped_refptr(buf)); SendPending(); } void WebSocket::Close() { DCHECK(MessageLoop::current() == origin_loop_); // If connection has not yet started, do nothing. if (ready_state_ == INITIALIZED) { DCHECK(!socket_stream_); ready_state_ = CLOSED; return; } // If the readyState attribute is in the CLOSING or CLOSED state, do nothing if (ready_state_ == CLOSING || ready_state_ == CLOSED) return; if (request_->version() == DRAFT75) { DCHECK(socket_stream_); socket_stream_->Close(); return; } // If the WebSocket connection is not yet established, fail the WebSocket // connection and set the readyState attribute's value to CLOSING. if (ready_state_ == CONNECTING) { ready_state_ = CLOSING; origin_loop_->PostTask( FROM_HERE, NewRunnableMethod(this, &WebSocket::FailConnection)); } // If the WebSocket closing handshake has not yet been started, start // the WebSocket closing handshake and set the readyState attribute's value // to CLOSING. if (!closing_handshake_started_) { ready_state_ = CLOSING; origin_loop_->PostTask( FROM_HERE, NewRunnableMethod(this, &WebSocket::StartClosingHandshake)); } // Otherwise, set the readyState attribute's value to CLOSING. ready_state_ = CLOSING; } void WebSocket::DetachDelegate() { if (!delegate_) return; delegate_ = NULL; if (ready_state_ == INITIALIZED) { DCHECK(!socket_stream_); ready_state_ = CLOSED; return; } if (ready_state_ != CLOSED) { DCHECK(socket_stream_); socket_stream_->Close(); } } void WebSocket::OnConnected(SocketStream* socket_stream, int max_pending_send_allowed) { DCHECK(socket_stream == socket_stream_); max_pending_send_allowed_ = max_pending_send_allowed; // Use |max_pending_send_allowed| as hint for initial size of read buffer. current_read_buf_ = new GrowableIOBuffer(); current_read_buf_->SetCapacity(max_pending_send_allowed_); read_consumed_len_ = 0; DCHECK(!current_write_buf_); DCHECK(!handshake_.get()); switch (request_->version()) { case DEFAULT_VERSION: handshake_.reset(new WebSocketHandshake( request_->url(), request_->origin(), request_->location(), request_->protocol())); break; case DRAFT75: handshake_.reset(new WebSocketHandshakeDraft75( request_->url(), request_->origin(), request_->location(), request_->protocol())); break; default: NOTREACHED() << "Unexpected protocol version:" << request_->version(); } const std::string msg = handshake_->CreateClientHandshakeMessage(); IOBufferWithSize* buf = new IOBufferWithSize(msg.size()); memcpy(buf->data(), msg.data(), msg.size()); pending_write_bufs_.push_back(make_scoped_refptr(buf)); origin_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &WebSocket::SendPending)); } void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) { DCHECK(socket_stream == socket_stream_); DCHECK(current_write_buf_); current_write_buf_->DidConsume(amount_sent); DCHECK_GE(current_write_buf_->BytesRemaining(), 0); if (current_write_buf_->BytesRemaining() == 0) { current_write_buf_ = NULL; pending_write_bufs_.pop_front(); } origin_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &WebSocket::SendPending)); } void WebSocket::OnReceivedData(SocketStream* socket_stream, const char* data, int len) { DCHECK(socket_stream == socket_stream_); AddToReadBuffer(data, len); origin_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &WebSocket::DoReceivedData)); } void WebSocket::OnClose(SocketStream* socket_stream) { origin_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &WebSocket::DoClose)); } void WebSocket::OnError(const SocketStream* socket_stream, int error) { origin_loop_->PostTask( FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error)); } void WebSocket::SendPending() { DCHECK(MessageLoop::current() == origin_loop_); if (!socket_stream_) { DCHECK_EQ(CLOSED, ready_state_); return; } if (!current_write_buf_) { if (pending_write_bufs_.empty()) { if (client_closing_handshake_) { // Already sent 0xFF and 0x00 bytes. // *The WebSocket closing handshake has started.* closing_handshake_started_ = true; if (server_closing_handshake_) { // 4.2 3-8-3 If the WebSocket connection is not already closed, // then close the WebSocket connection. // *The WebSocket closing handshake has finished* socket_stream_->Close(); } else { // 5. Wait a user-agent-determined length of time, or until the // WebSocket connection is closed. force_close_task_ = NewRunnableMethod(this, &WebSocket::DoForceCloseConnection); origin_loop_->PostDelayedTask( FROM_HERE, force_close_task_, closing_handshake_timeout_); } } return; } current_write_buf_ = new DrainableIOBuffer( pending_write_bufs_.front(), pending_write_bufs_.front()->size()); } DCHECK_GT(current_write_buf_->BytesRemaining(), 0); bool sent = socket_stream_->SendData( current_write_buf_->data(), std::min(current_write_buf_->BytesRemaining(), max_pending_send_allowed_)); DCHECK(sent); } void WebSocket::DoReceivedData() { DCHECK(MessageLoop::current() == origin_loop_); scoped_refptr<WebSocket> protect(this); switch (ready_state_) { case CONNECTING: { DCHECK(handshake_.get()); DCHECK(current_read_buf_); const char* data = current_read_buf_->StartOfBuffer() + read_consumed_len_; size_t len = current_read_buf_->offset() - read_consumed_len_; int eoh = handshake_->ReadServerHandshake(data, len); if (eoh < 0) { // Not enough data, Retry when more data is available. return; } SkipReadBuffer(eoh); } if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) { // Handshake failed. socket_stream_->Close(); return; } ready_state_ = OPEN; if (delegate_) delegate_->OnOpen(this); if (current_read_buf_->offset() == read_consumed_len_) { // No remaining data after handshake message. break; } // FALL THROUGH case OPEN: case CLOSING: // need to process closing-frame from server. ProcessFrameData(); break; case CLOSED: // Closed just after DoReceivedData is queued on |origin_loop_|. break; default: NOTREACHED(); break; } } void WebSocket::ProcessFrameData() { DCHECK(current_read_buf_); if (server_closing_handshake_) { // Any data on the connection after the 0xFF frame is discarded. return; } scoped_refptr<WebSocket> protect(this); const char* start_frame = current_read_buf_->StartOfBuffer() + read_consumed_len_; const char* next_frame = start_frame; const char* p = next_frame; const char* end = current_read_buf_->StartOfBuffer() + current_read_buf_->offset(); while (p < end) { // Let /error/ be false. bool error = false; // Handle the /frame type/ byte as follows. unsigned char frame_byte = static_cast<unsigned char>(*p++); if ((frame_byte & 0x80) == 0x80) { int length = 0; while (p < end) { if (length > std::numeric_limits<int>::max() / 128) { // frame length overflow. socket_stream_->Close(); return; } unsigned char c = static_cast<unsigned char>(*p); length = length * 128 + (c & 0x7f); ++p; if ((c & 0x80) != 0x80) break; } // Checks if the frame body hasn't been completely received yet. // It also checks the case the frame length bytes haven't been completely // received yet, because p == end and length > 0 in such case. if (p + length < end) { p += length; next_frame = p; if (request_->version() != DRAFT75 && frame_byte == 0xFF && length == 0) { // 4.2 Data framing 3. Handle the /frame type/ byte. // 8. If the /frame type/ is 0xFF and the /length/ was 0, then // run the following substeps: // 1. If the WebSocket closing handshake has not yet started, then // start the WebSocket closing handshake. server_closing_handshake_ = true; if (!closing_handshake_started_) { origin_loop_->PostTask( FROM_HERE, NewRunnableMethod(this, &WebSocket::StartClosingHandshake)); } else { // If the WebSocket closing handshake has been started and // the WebSocket connection is not already closed, then close // the WebSocket connection. socket_stream_->Close(); } return; } // 4.2 3-8 Otherwise, let /error/ be true. error = true; } else { // Not enough data in buffer. break; } } else { const char* msg_start = p; while (p < end && *p != '\xff') ++p; if (p < end && *p == '\xff') { if (frame_byte == 0x00) { if (delegate_) { delegate_->OnMessage(this, std::string(msg_start, p - msg_start)); } } else { // Otherwise, discard the data and let /error/ to be true. error = true; } ++p; next_frame = p; } } // If /error/ is true, then *a WebSocket error has been detected.* if (error && delegate_) delegate_->OnError(this); } SkipReadBuffer(next_frame - start_frame); } void WebSocket::AddToReadBuffer(const char* data, int len) { DCHECK(current_read_buf_); // Check if |current_read_buf_| has enough space to store |len| of |data|. if (len >= current_read_buf_->RemainingCapacity()) { current_read_buf_->SetCapacity( current_read_buf_->offset() + len); } DCHECK(current_read_buf_->RemainingCapacity() >= len); memcpy(current_read_buf_->data(), data, len); current_read_buf_->set_offset(current_read_buf_->offset() + len); } void WebSocket::SkipReadBuffer(int len) { if (len == 0) return; DCHECK_GT(len, 0); read_consumed_len_ += len; int remaining = current_read_buf_->offset() - read_consumed_len_; DCHECK_GE(remaining, 0); if (remaining < read_consumed_len_ && current_read_buf_->RemainingCapacity() < read_consumed_len_) { // Pre compaction: // 0 v-read_consumed_len_ v-offset v- capacity // |..processed..| .. remaining .. | .. RemainingCapacity | // memmove(current_read_buf_->StartOfBuffer(), current_read_buf_->StartOfBuffer() + read_consumed_len_, remaining); read_consumed_len_ = 0; current_read_buf_->set_offset(remaining); // Post compaction: // 0read_consumed_len_ v- offset v- capacity // |.. remaining .. | .. RemainingCapacity ... | // } } void WebSocket::StartClosingHandshake() { // 4.2 *start the WebSocket closing handshake*. if (closing_handshake_started_ || client_closing_handshake_) { // 1. If the WebSocket closing handshake has started, then abort these // steps. return; } // 2.,3. Send a 0xFF and 0x00 byte to the server. client_closing_handshake_ = true; IOBufferWithSize* buf = new IOBufferWithSize(2); memcpy(buf->data(), kClosingFrame, 2); pending_write_bufs_.push_back(make_scoped_refptr(buf)); SendPending(); } void WebSocket::DoForceCloseConnection() { // 4.2 *start the WebSocket closing handshake* // 6. If the WebSocket connection is not already closed, then close the // WebSocket connection. (If this happens, then the closing handshake // doesn't finish.) DCHECK(MessageLoop::current() == origin_loop_); force_close_task_ = NULL; FailConnection(); } void WebSocket::FailConnection() { DCHECK(MessageLoop::current() == origin_loop_); // 6.1 Client-initiated closure. // *fail the WebSocket connection*. // the user agent must close the WebSocket connection, and may report the // problem to the user. if (!socket_stream_) return; socket_stream_->Close(); } void WebSocket::DoClose() { DCHECK(MessageLoop::current() == origin_loop_); if (force_close_task_) { // WebSocket connection is closed while waiting a user-agent-determined // length of time after *The WebSocket closing handshake has started*. force_close_task_->Cancel(); force_close_task_ = NULL; } WebSocketDelegate* delegate = delegate_; delegate_ = NULL; ready_state_ = CLOSED; if (!socket_stream_) return; socket_stream_ = NULL; if (delegate) delegate->OnClose(this, server_closing_handshake_ && closing_handshake_started_); Release(); } void WebSocket::DoSocketError(int error) { DCHECK(MessageLoop::current() == origin_loop_); if (delegate_) delegate_->OnSocketError(this, error); } } // namespace net