// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/websockets/websocket_job.h"
#include <algorithm>
#include "base/lazy_instance.h"
#include "base/string_tokenizer.h"
#include "googleurl/src/gurl.h"
#include "net/base/net_errors.h"
#include "net/base/net_log.h"
#include "net/base/cookie_policy.h"
#include "net/base/cookie_store.h"
#include "net/base/io_buffer.h"
#include "net/http/http_util.h"
#include "net/url_request/url_request_context.h"
#include "net/websockets/websocket_frame_handler.h"
#include "net/websockets/websocket_handshake_handler.h"
#include "net/websockets/websocket_net_log_params.h"
#include "net/websockets/websocket_throttle.h"
namespace {
// lower-case header names.
const char* const kCookieHeaders[] = {
"cookie", "cookie2"
};
const char* const kSetCookieHeaders[] = {
"set-cookie", "set-cookie2"
};
net::SocketStreamJob* WebSocketJobFactory(
const GURL& url, net::SocketStream::Delegate* delegate) {
net::WebSocketJob* job = new net::WebSocketJob(delegate);
job->InitSocketStream(new net::SocketStream(url, job));
return job;
}
class WebSocketJobInitSingleton {
private:
friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
WebSocketJobInitSingleton() {
net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
}
};
static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init(
base::LINKER_INITIALIZED);
} // anonymous namespace
namespace net {
// static
void WebSocketJob::EnsureInit() {
g_websocket_job_init.Get();
}
WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
: delegate_(delegate),
state_(INITIALIZED),
waiting_(false),
callback_(NULL),
handshake_request_(new WebSocketHandshakeRequestHandler),
handshake_response_(new WebSocketHandshakeResponseHandler),
handshake_request_sent_(0),
response_cookies_save_index_(0),
send_frame_handler_(new WebSocketFrameHandler),
receive_frame_handler_(new WebSocketFrameHandler) {
}
WebSocketJob::~WebSocketJob() {
DCHECK_EQ(CLOSED, state_);
DCHECK(!delegate_);
DCHECK(!socket_.get());
}
void WebSocketJob::Connect() {
DCHECK(socket_.get());
DCHECK_EQ(state_, INITIALIZED);
state_ = CONNECTING;
socket_->Connect();
}
bool WebSocketJob::SendData(const char* data, int len) {
switch (state_) {
case INITIALIZED:
return false;
case CONNECTING:
return SendHandshakeRequest(data, len);
case OPEN:
{
send_frame_handler_->AppendData(data, len);
// If current buffer is sending now, this data will be sent in
// SendPending() after current data was sent.
// Do not buffer sending data for now. Since
// WebCore::SocketStreamHandle controls traffic to keep number of
// pending bytes less than max_pending_send_allowed, so when sending
// larger message than max_pending_send_allowed should not be buffered.
// If we don't call OnSentData, WebCore::SocketStreamHandle would stop
// sending more data when pending data reaches max_pending_send_allowed.
// TODO(ukai): Fix this to support compression for larger message.
int err = 0;
if (!send_frame_handler_->GetCurrentBuffer() &&
(err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
DCHECK(!current_buffer_);
current_buffer_ = new DrainableIOBuffer(
send_frame_handler_->GetCurrentBuffer(),
send_frame_handler_->GetCurrentBufferSize());
return socket_->SendData(
current_buffer_->data(), current_buffer_->BytesRemaining());
}
return err >= 0;
}
case CLOSING:
case CLOSED:
return false;
}
return false;
}
void WebSocketJob::Close() {
state_ = CLOSING;
if (current_buffer_) {
// Will close in SendPending.
return;
}
state_ = CLOSED;
socket_->Close();
}
void WebSocketJob::RestartWithAuth(
const string16& username,
const string16& password) {
state_ = CONNECTING;
socket_->RestartWithAuth(username, password);
}
void WebSocketJob::DetachDelegate() {
state_ = CLOSED;
WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
scoped_refptr<WebSocketJob> protect(this);
delegate_ = NULL;
if (socket_)
socket_->DetachDelegate();
socket_ = NULL;
if (callback_) {
waiting_ = false;
callback_ = NULL;
Release(); // Balanced with OnStartOpenConnection().
}
}
int WebSocketJob::OnStartOpenConnection(
SocketStream* socket, CompletionCallback* callback) {
DCHECK(!callback_);
state_ = CONNECTING;
addresses_.Copy(socket->address_list().head(), true);
WebSocketThrottle::GetInstance()->PutInQueue(this);
if (!waiting_)
return OK;
callback_ = callback;
AddRef(); // Balanced when callback_ becomes NULL.
return ERR_IO_PENDING;
}
void WebSocketJob::OnConnected(
SocketStream* socket, int max_pending_send_allowed) {
if (state_ == CLOSED)
return;
DCHECK_EQ(CONNECTING, state_);
if (delegate_)
delegate_->OnConnected(socket, max_pending_send_allowed);
}
void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
DCHECK_NE(INITIALIZED, state_);
if (state_ == CLOSED)
return;
if (state_ == CONNECTING) {
OnSentHandshakeRequest(socket, amount_sent);
return;
}
if (delegate_) {
DCHECK(state_ == OPEN || state_ == CLOSING);
DCHECK_GT(amount_sent, 0);
DCHECK(current_buffer_);
current_buffer_->DidConsume(amount_sent);
if (current_buffer_->BytesRemaining() > 0)
return;
// We need to report amount_sent of original buffer size, instead of
// amount sent to |socket|.
amount_sent = send_frame_handler_->GetOriginalBufferSize();
DCHECK_GT(amount_sent, 0);
current_buffer_ = NULL;
send_frame_handler_->ReleaseCurrentBuffer();
delegate_->OnSentData(socket, amount_sent);
MessageLoopForIO::current()->PostTask(
FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending));
}
}
void WebSocketJob::OnReceivedData(
SocketStream* socket, const char* data, int len) {
DCHECK_NE(INITIALIZED, state_);
if (state_ == CLOSED)
return;
if (state_ == CONNECTING) {
OnReceivedHandshakeResponse(socket, data, len);
return;
}
DCHECK(state_ == OPEN || state_ == CLOSING);
std::string received_data;
receive_frame_handler_->AppendData(data, len);
// Don't buffer receiving data for now.
// TODO(ukai): fix performance of WebSocketFrameHandler.
while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
received_data +=
std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
receive_frame_handler_->GetCurrentBufferSize());
receive_frame_handler_->ReleaseCurrentBuffer();
}
if (delegate_ && !received_data.empty())
delegate_->OnReceivedData(
socket, received_data.data(), received_data.size());
}
void WebSocketJob::OnClose(SocketStream* socket) {
state_ = CLOSED;
WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
scoped_refptr<WebSocketJob> protect(this);
SocketStream::Delegate* delegate = delegate_;
delegate_ = NULL;
socket_ = NULL;
if (callback_) {
waiting_ = false;
callback_ = NULL;
Release(); // Balanced with OnStartOpenConnection().
}
if (delegate)
delegate->OnClose(socket);
}
void WebSocketJob::OnAuthRequired(
SocketStream* socket, AuthChallengeInfo* auth_info) {
if (delegate_)
delegate_->OnAuthRequired(socket, auth_info);
}
void WebSocketJob::OnError(const SocketStream* socket, int error) {
if (delegate_)
delegate_->OnError(socket, error);
}
bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
DCHECK_EQ(state_, CONNECTING);
if (!handshake_request_->ParseRequest(data, len))
return false;
// handshake message is completed.
AddCookieHeaderAndSend();
// Just buffered in |handshake_request_|.
return true;
}
void WebSocketJob::AddCookieHeaderAndSend() {
int policy = OK;
if (socket_->context()->cookie_policy()) {
GURL url_for_cookies = GetURLForCookies();
policy = socket_->context()->cookie_policy()->CanGetCookies(
url_for_cookies,
url_for_cookies);
}
DCHECK_NE(ERR_IO_PENDING, policy);
OnCanGetCookiesCompleted(policy);
}
void WebSocketJob::OnCanGetCookiesCompleted(int policy) {
if (socket_ && delegate_ && state_ == CONNECTING) {
handshake_request_->RemoveHeaders(
kCookieHeaders, arraysize(kCookieHeaders));
if (policy == OK) {
// Add cookies, including HttpOnly cookies.
if (socket_->context()->cookie_store()) {
CookieOptions cookie_options;
cookie_options.set_include_httponly();
std::string cookie =
socket_->context()->cookie_store()->GetCookiesWithOptions(
GetURLForCookies(), cookie_options);
if (!cookie.empty())
handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
}
}
const std::string& handshake_request = handshake_request_->GetRawRequest();
handshake_request_sent_ = 0;
socket_->net_log()->AddEvent(
NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
make_scoped_refptr(
new NetLogWebSocketHandshakeParameter(handshake_request)));
socket_->SendData(handshake_request.data(),
handshake_request.size());
}
}
void WebSocketJob::OnSentHandshakeRequest(
SocketStream* socket, int amount_sent) {
DCHECK_EQ(state_, CONNECTING);
handshake_request_sent_ += amount_sent;
DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
if (handshake_request_sent_ >= handshake_request_->raw_length()) {
// handshake request has been sent.
// notify original size of handshake request to delegate.
if (delegate_)
delegate_->OnSentData(
socket,
handshake_request_->original_length());
handshake_request_.reset();
}
}
void WebSocketJob::OnReceivedHandshakeResponse(
SocketStream* socket, const char* data, int len) {
DCHECK_EQ(state_, CONNECTING);
if (handshake_response_->HasResponse()) {
// If we already has handshake response, received data should be frame
// data, not handshake message.
receive_frame_handler_->AppendData(data, len);
return;
}
size_t response_length = handshake_response_->ParseRawResponse(data, len);
if (!handshake_response_->HasResponse()) {
// not yet. we need more data.
return;
}
// handshake message is completed.
socket_->net_log()->AddEvent(
NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
make_scoped_refptr(new NetLogWebSocketHandshakeParameter(
handshake_response_->GetRawResponse())));
if (len - response_length > 0) {
// If we received extra data, it should be frame data.
receive_frame_handler_->AppendData(data + response_length,
len - response_length);
}
SaveCookiesAndNotifyHeaderComplete();
}
void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
// handshake message is completed.
DCHECK(handshake_response_->HasResponse());
response_cookies_.clear();
response_cookies_save_index_ = 0;
handshake_response_->GetHeaders(
kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
// Now, loop over the response cookies, and attempt to persist each.
SaveNextCookie();
}
void WebSocketJob::SaveNextCookie() {
if (response_cookies_save_index_ == response_cookies_.size()) {
response_cookies_.clear();
response_cookies_save_index_ = 0;
// Remove cookie headers, with malformed headers preserved.
// Actual handshake should be done in WebKit.
handshake_response_->RemoveHeaders(
kSetCookieHeaders, arraysize(kSetCookieHeaders));
std::string received_data = handshake_response_->GetResponse();
// Don't buffer receiving data for now.
// TODO(ukai): fix performance of WebSocketFrameHandler.
while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
received_data +=
std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
receive_frame_handler_->GetCurrentBufferSize());
receive_frame_handler_->ReleaseCurrentBuffer();
}
state_ = OPEN;
if (delegate_)
delegate_->OnReceivedData(
socket_, received_data.data(), received_data.size());
handshake_response_.reset();
WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
return;
}
int policy = OK;
if (socket_->context()->cookie_policy()) {
GURL url_for_cookies = GetURLForCookies();
policy = socket_->context()->cookie_policy()->CanSetCookie(
url_for_cookies,
url_for_cookies,
response_cookies_[response_cookies_save_index_]);
}
DCHECK_NE(ERR_IO_PENDING, policy);
OnCanSetCookieCompleted(policy);
}
void WebSocketJob::OnCanSetCookieCompleted(int policy) {
if (socket_ && delegate_ && state_ == CONNECTING) {
if ((policy == OK || policy == OK_FOR_SESSION_ONLY) &&
socket_->context()->cookie_store()) {
CookieOptions options;
options.set_include_httponly();
if (policy == OK_FOR_SESSION_ONLY)
options.set_force_session();
GURL url_for_cookies = GetURLForCookies();
socket_->context()->cookie_store()->SetCookieWithOptions(
url_for_cookies, response_cookies_[response_cookies_save_index_],
options);
}
response_cookies_save_index_++;
SaveNextCookie();
}
}
GURL WebSocketJob::GetURLForCookies() const {
GURL url = socket_->url();
std::string scheme = socket_->is_secure() ? "https" : "http";
url_canon::Replacements<char> replacements;
replacements.SetScheme(scheme.c_str(),
url_parse::Component(0, scheme.length()));
return url.ReplaceComponents(replacements);
}
const AddressList& WebSocketJob::address_list() const {
return addresses_;
}
void WebSocketJob::SetWaiting() {
waiting_ = true;
}
bool WebSocketJob::IsWaiting() const {
return waiting_;
}
void WebSocketJob::Wakeup() {
if (!waiting_)
return;
waiting_ = false;
DCHECK(callback_);
MessageLoopForIO::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&WebSocketJob::DoCallback));
}
void WebSocketJob::DoCallback() {
// |callback_| may be NULL if OnClose() or DetachDelegate() was called.
if (callback_) {
net::CompletionCallback* callback = callback_;
callback_ = NULL;
callback->Run(net::OK);
Release(); // Balanced with OnStartOpenConnection().
}
}
void WebSocketJob::SendPending() {
if (current_buffer_)
return;
// Current buffer is done. Try next buffer if any.
// Don't buffer sending data. See comment on case OPEN in SendData().
if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
// No more data to send.
if (state_ == CLOSING)
socket_->Close();
return;
}
current_buffer_ = new DrainableIOBuffer(
send_frame_handler_->GetCurrentBuffer(),
send_frame_handler_->GetCurrentBufferSize());
socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining());
}
} // namespace net