普通文本  |  499行  |  15.24 KB

// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/websockets/websocket_job.h"

#include <algorithm>

#include "base/lazy_instance.h"
#include "base/string_tokenizer.h"
#include "googleurl/src/gurl.h"
#include "net/base/net_errors.h"
#include "net/base/net_log.h"
#include "net/base/cookie_policy.h"
#include "net/base/cookie_store.h"
#include "net/base/io_buffer.h"
#include "net/http/http_util.h"
#include "net/url_request/url_request_context.h"
#include "net/websockets/websocket_frame_handler.h"
#include "net/websockets/websocket_handshake_handler.h"
#include "net/websockets/websocket_net_log_params.h"
#include "net/websockets/websocket_throttle.h"

namespace {

// lower-case header names.
const char* const kCookieHeaders[] = {
  "cookie", "cookie2"
};
const char* const kSetCookieHeaders[] = {
  "set-cookie", "set-cookie2"
};

net::SocketStreamJob* WebSocketJobFactory(
    const GURL& url, net::SocketStream::Delegate* delegate) {
  net::WebSocketJob* job = new net::WebSocketJob(delegate);
  job->InitSocketStream(new net::SocketStream(url, job));
  return job;
}

class WebSocketJobInitSingleton {
 private:
  friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
  WebSocketJobInitSingleton() {
    net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
    net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
  }
};

static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init(
    base::LINKER_INITIALIZED);

}  // anonymous namespace

namespace net {

// static
void WebSocketJob::EnsureInit() {
  g_websocket_job_init.Get();
}

WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
    : delegate_(delegate),
      state_(INITIALIZED),
      waiting_(false),
      callback_(NULL),
      handshake_request_(new WebSocketHandshakeRequestHandler),
      handshake_response_(new WebSocketHandshakeResponseHandler),
      handshake_request_sent_(0),
      response_cookies_save_index_(0),
      send_frame_handler_(new WebSocketFrameHandler),
      receive_frame_handler_(new WebSocketFrameHandler) {
}

WebSocketJob::~WebSocketJob() {
  DCHECK_EQ(CLOSED, state_);
  DCHECK(!delegate_);
  DCHECK(!socket_.get());
}

void WebSocketJob::Connect() {
  DCHECK(socket_.get());
  DCHECK_EQ(state_, INITIALIZED);
  state_ = CONNECTING;
  socket_->Connect();
}

bool WebSocketJob::SendData(const char* data, int len) {
  switch (state_) {
    case INITIALIZED:
      return false;

    case CONNECTING:
      return SendHandshakeRequest(data, len);

    case OPEN:
      {
        send_frame_handler_->AppendData(data, len);
        // If current buffer is sending now, this data will be sent in
        // SendPending() after current data was sent.
        // Do not buffer sending data for now.  Since
        // WebCore::SocketStreamHandle controls traffic to keep number of
        // pending bytes less than max_pending_send_allowed, so when sending
        // larger message than max_pending_send_allowed should not be buffered.
        // If we don't call OnSentData, WebCore::SocketStreamHandle would stop
        // sending more data when pending data reaches max_pending_send_allowed.
        // TODO(ukai): Fix this to support compression for larger message.
        int err = 0;
        if (!send_frame_handler_->GetCurrentBuffer() &&
            (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
          DCHECK(!current_buffer_);
          current_buffer_ = new DrainableIOBuffer(
              send_frame_handler_->GetCurrentBuffer(),
              send_frame_handler_->GetCurrentBufferSize());
          return socket_->SendData(
              current_buffer_->data(), current_buffer_->BytesRemaining());
        }
        return err >= 0;
      }

    case CLOSING:
    case CLOSED:
      return false;
  }
  return false;
}

void WebSocketJob::Close() {
  state_ = CLOSING;
  if (current_buffer_) {
    // Will close in SendPending.
    return;
  }
  state_ = CLOSED;
  socket_->Close();
}

void WebSocketJob::RestartWithAuth(
    const string16& username,
    const string16& password) {
  state_ = CONNECTING;
  socket_->RestartWithAuth(username, password);
}

void WebSocketJob::DetachDelegate() {
  state_ = CLOSED;
  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
  WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();

  scoped_refptr<WebSocketJob> protect(this);

  delegate_ = NULL;
  if (socket_)
    socket_->DetachDelegate();
  socket_ = NULL;
  if (callback_) {
    waiting_ = false;
    callback_ = NULL;
    Release();  // Balanced with OnStartOpenConnection().
  }
}

int WebSocketJob::OnStartOpenConnection(
    SocketStream* socket, CompletionCallback* callback) {
  DCHECK(!callback_);
  state_ = CONNECTING;
  addresses_.Copy(socket->address_list().head(), true);
  WebSocketThrottle::GetInstance()->PutInQueue(this);
  if (!waiting_)
    return OK;
  callback_ = callback;
  AddRef();  // Balanced when callback_ becomes NULL.
  return ERR_IO_PENDING;
}

void WebSocketJob::OnConnected(
    SocketStream* socket, int max_pending_send_allowed) {
  if (state_ == CLOSED)
    return;
  DCHECK_EQ(CONNECTING, state_);
  if (delegate_)
    delegate_->OnConnected(socket, max_pending_send_allowed);
}

void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
  DCHECK_NE(INITIALIZED, state_);
  if (state_ == CLOSED)
    return;
  if (state_ == CONNECTING) {
    OnSentHandshakeRequest(socket, amount_sent);
    return;
  }
  if (delegate_) {
    DCHECK(state_ == OPEN || state_ == CLOSING);
    DCHECK_GT(amount_sent, 0);
    DCHECK(current_buffer_);
    current_buffer_->DidConsume(amount_sent);
    if (current_buffer_->BytesRemaining() > 0)
      return;

    // We need to report amount_sent of original buffer size, instead of
    // amount sent to |socket|.
    amount_sent = send_frame_handler_->GetOriginalBufferSize();
    DCHECK_GT(amount_sent, 0);
    current_buffer_ = NULL;
    send_frame_handler_->ReleaseCurrentBuffer();
    delegate_->OnSentData(socket, amount_sent);
    MessageLoopForIO::current()->PostTask(
        FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending));
  }
}

void WebSocketJob::OnReceivedData(
    SocketStream* socket, const char* data, int len) {
  DCHECK_NE(INITIALIZED, state_);
  if (state_ == CLOSED)
    return;
  if (state_ == CONNECTING) {
    OnReceivedHandshakeResponse(socket, data, len);
    return;
  }
  DCHECK(state_ == OPEN || state_ == CLOSING);
  std::string received_data;
  receive_frame_handler_->AppendData(data, len);
  // Don't buffer receiving data for now.
  // TODO(ukai): fix performance of WebSocketFrameHandler.
  while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
    received_data +=
        std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
                    receive_frame_handler_->GetCurrentBufferSize());
    receive_frame_handler_->ReleaseCurrentBuffer();
  }
  if (delegate_ && !received_data.empty())
      delegate_->OnReceivedData(
          socket, received_data.data(), received_data.size());
}

void WebSocketJob::OnClose(SocketStream* socket) {
  state_ = CLOSED;
  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
  WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();

  scoped_refptr<WebSocketJob> protect(this);

  SocketStream::Delegate* delegate = delegate_;
  delegate_ = NULL;
  socket_ = NULL;
  if (callback_) {
    waiting_ = false;
    callback_ = NULL;
    Release();  // Balanced with OnStartOpenConnection().
  }
  if (delegate)
    delegate->OnClose(socket);
}

void WebSocketJob::OnAuthRequired(
    SocketStream* socket, AuthChallengeInfo* auth_info) {
  if (delegate_)
    delegate_->OnAuthRequired(socket, auth_info);
}

void WebSocketJob::OnError(const SocketStream* socket, int error) {
  if (delegate_)
    delegate_->OnError(socket, error);
}

bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
  DCHECK_EQ(state_, CONNECTING);
  if (!handshake_request_->ParseRequest(data, len))
    return false;

  // handshake message is completed.
  AddCookieHeaderAndSend();
  // Just buffered in |handshake_request_|.
  return true;
}

void WebSocketJob::AddCookieHeaderAndSend() {
  int policy = OK;
  if (socket_->context()->cookie_policy()) {
    GURL url_for_cookies = GetURLForCookies();
    policy = socket_->context()->cookie_policy()->CanGetCookies(
        url_for_cookies,
        url_for_cookies);
  }
  DCHECK_NE(ERR_IO_PENDING, policy);
  OnCanGetCookiesCompleted(policy);
}

void WebSocketJob::OnCanGetCookiesCompleted(int policy) {
  if (socket_ && delegate_ && state_ == CONNECTING) {
    handshake_request_->RemoveHeaders(
        kCookieHeaders, arraysize(kCookieHeaders));
    if (policy == OK) {
      // Add cookies, including HttpOnly cookies.
      if (socket_->context()->cookie_store()) {
        CookieOptions cookie_options;
        cookie_options.set_include_httponly();
        std::string cookie =
            socket_->context()->cookie_store()->GetCookiesWithOptions(
                GetURLForCookies(), cookie_options);
        if (!cookie.empty())
          handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
      }
    }

    const std::string& handshake_request = handshake_request_->GetRawRequest();
    handshake_request_sent_ = 0;
    socket_->net_log()->AddEvent(
        NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
        make_scoped_refptr(
            new NetLogWebSocketHandshakeParameter(handshake_request)));
    socket_->SendData(handshake_request.data(),
                      handshake_request.size());
  }
}

void WebSocketJob::OnSentHandshakeRequest(
    SocketStream* socket, int amount_sent) {
  DCHECK_EQ(state_, CONNECTING);
  handshake_request_sent_ += amount_sent;
  DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
  if (handshake_request_sent_ >= handshake_request_->raw_length()) {
    // handshake request has been sent.
    // notify original size of handshake request to delegate.
    if (delegate_)
      delegate_->OnSentData(
          socket,
          handshake_request_->original_length());
    handshake_request_.reset();
  }
}

void WebSocketJob::OnReceivedHandshakeResponse(
    SocketStream* socket, const char* data, int len) {
  DCHECK_EQ(state_, CONNECTING);
  if (handshake_response_->HasResponse()) {
    // If we already has handshake response, received data should be frame
    // data, not handshake message.
    receive_frame_handler_->AppendData(data, len);
    return;
  }

  size_t response_length = handshake_response_->ParseRawResponse(data, len);
  if (!handshake_response_->HasResponse()) {
    // not yet. we need more data.
    return;
  }
  // handshake message is completed.
  socket_->net_log()->AddEvent(
      NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
      make_scoped_refptr(new NetLogWebSocketHandshakeParameter(
          handshake_response_->GetRawResponse())));
  if (len - response_length > 0) {
    // If we received extra data, it should be frame data.
    receive_frame_handler_->AppendData(data + response_length,
                                       len - response_length);
  }
  SaveCookiesAndNotifyHeaderComplete();
}

void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
  // handshake message is completed.
  DCHECK(handshake_response_->HasResponse());

  response_cookies_.clear();
  response_cookies_save_index_ = 0;

  handshake_response_->GetHeaders(
      kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);

  // Now, loop over the response cookies, and attempt to persist each.
  SaveNextCookie();
}

void WebSocketJob::SaveNextCookie() {
  if (response_cookies_save_index_ == response_cookies_.size()) {
    response_cookies_.clear();
    response_cookies_save_index_ = 0;

    // Remove cookie headers, with malformed headers preserved.
    // Actual handshake should be done in WebKit.
    handshake_response_->RemoveHeaders(
        kSetCookieHeaders, arraysize(kSetCookieHeaders));
    std::string received_data = handshake_response_->GetResponse();
    // Don't buffer receiving data for now.
    // TODO(ukai): fix performance of WebSocketFrameHandler.
    while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
      received_data +=
          std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
                      receive_frame_handler_->GetCurrentBufferSize());
      receive_frame_handler_->ReleaseCurrentBuffer();
    }

    state_ = OPEN;
    if (delegate_)
      delegate_->OnReceivedData(
          socket_, received_data.data(), received_data.size());

    handshake_response_.reset();

    WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
    return;
  }

  int policy = OK;
  if (socket_->context()->cookie_policy()) {
    GURL url_for_cookies = GetURLForCookies();
    policy = socket_->context()->cookie_policy()->CanSetCookie(
        url_for_cookies,
        url_for_cookies,
        response_cookies_[response_cookies_save_index_]);
  }

  DCHECK_NE(ERR_IO_PENDING, policy);
  OnCanSetCookieCompleted(policy);
}

void WebSocketJob::OnCanSetCookieCompleted(int policy) {
  if (socket_ && delegate_ && state_ == CONNECTING) {
    if ((policy == OK || policy == OK_FOR_SESSION_ONLY) &&
        socket_->context()->cookie_store()) {
      CookieOptions options;
      options.set_include_httponly();
      if (policy == OK_FOR_SESSION_ONLY)
        options.set_force_session();
      GURL url_for_cookies = GetURLForCookies();
      socket_->context()->cookie_store()->SetCookieWithOptions(
          url_for_cookies, response_cookies_[response_cookies_save_index_],
          options);
    }
    response_cookies_save_index_++;
    SaveNextCookie();
  }
}

GURL WebSocketJob::GetURLForCookies() const {
  GURL url = socket_->url();
  std::string scheme = socket_->is_secure() ? "https" : "http";
  url_canon::Replacements<char> replacements;
  replacements.SetScheme(scheme.c_str(),
                         url_parse::Component(0, scheme.length()));
  return url.ReplaceComponents(replacements);
}

const AddressList& WebSocketJob::address_list() const {
  return addresses_;
}

void WebSocketJob::SetWaiting() {
  waiting_ = true;
}

bool WebSocketJob::IsWaiting() const {
  return waiting_;
}

void WebSocketJob::Wakeup() {
  if (!waiting_)
    return;
  waiting_ = false;
  DCHECK(callback_);
  MessageLoopForIO::current()->PostTask(
      FROM_HERE,
      NewRunnableMethod(this,
                        &WebSocketJob::DoCallback));
}

void WebSocketJob::DoCallback() {
  // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
  if (callback_) {
    net::CompletionCallback* callback = callback_;
    callback_ = NULL;
    callback->Run(net::OK);
    Release();  // Balanced with OnStartOpenConnection().
  }
}

void WebSocketJob::SendPending() {
  if (current_buffer_)
    return;
  // Current buffer is done.  Try next buffer if any.
  // Don't buffer sending data. See comment on case OPEN in SendData().
  if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
    // No more data to send.
    if (state_ == CLOSING)
      socket_->Close();
    return;
  }
  current_buffer_ = new DrainableIOBuffer(
      send_frame_handler_->GetCurrentBuffer(),
      send_frame_handler_->GetCurrentBufferSize());
  socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining());
}

}  // namespace net