普通文本  |  476行  |  14.42 KB

// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/spdy/spdy_http_stream.h"

#include <algorithm>
#include <list>
#include <string>

#include "base/logging.h"
#include "base/message_loop.h"
#include "net/base/address_list.h"
#include "net/base/host_port_pair.h"
#include "net/base/load_flags.h"
#include "net/base/net_util.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_request_info.h"
#include "net/http/http_response_info.h"
#include "net/http/http_util.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_session.h"

namespace net {

SpdyHttpStream::SpdyHttpStream(SpdySession* spdy_session,
                               bool direct)
    : ALLOW_THIS_IN_INITIALIZER_LIST(read_callback_factory_(this)),
      stream_(NULL),
      spdy_session_(spdy_session),
      response_info_(NULL),
      download_finished_(false),
      response_headers_received_(false),
      user_callback_(NULL),
      user_buffer_len_(0),
      buffered_read_callback_pending_(false),
      more_read_data_pending_(false),
      direct_(direct) { }

void SpdyHttpStream::InitializeWithExistingStream(SpdyStream* spdy_stream) {
  stream_ = spdy_stream;
  stream_->SetDelegate(this);
  response_headers_received_ = true;
}

SpdyHttpStream::~SpdyHttpStream() {
  if (stream_)
    stream_->DetachDelegate();
}

int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
                                     const BoundNetLog& stream_net_log,
                                     CompletionCallback* callback) {
  DCHECK(!stream_.get());
  if (spdy_session_->IsClosed())
   return ERR_CONNECTION_CLOSED;

  request_info_ = request_info;
  if (request_info_->method == "GET") {
    int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
                                             stream_net_log);
    if (error != OK)
      return error;
  }

  if (stream_.get())
    return OK;

  return spdy_session_->CreateStream(request_info_->url,
                                     request_info_->priority, &stream_,
                                     stream_net_log, callback);
}

const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
  return response_info_;
}

uint64 SpdyHttpStream::GetUploadProgress() const {
  if (!request_body_stream_.get())
    return 0;

  return request_body_stream_->position();
}

int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) {
  CHECK(callback);
  CHECK(!stream_->cancelled());

  if (stream_->closed())
    return stream_->response_status();

  // Check if we already have the response headers. If so, return synchronously.
  if(stream_->response_received()) {
    CHECK(stream_->is_idle());
    return OK;
  }

  // Still waiting for the response, return IO_PENDING.
  CHECK(!user_callback_);
  user_callback_ = callback;
  return ERR_IO_PENDING;
}

int SpdyHttpStream::ReadResponseBody(
    IOBuffer* buf, int buf_len, CompletionCallback* callback) {
  CHECK(stream_->is_idle());
  CHECK(buf);
  CHECK(buf_len);
  CHECK(callback);

  // If we have data buffered, complete the IO immediately.
  if (!response_body_.empty()) {
    int bytes_read = 0;
    while (!response_body_.empty() && buf_len > 0) {
      scoped_refptr<IOBufferWithSize> data = response_body_.front();
      const int bytes_to_copy = std::min(buf_len, data->size());
      memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
      buf_len -= bytes_to_copy;
      if (bytes_to_copy == data->size()) {
        response_body_.pop_front();
      } else {
        const int bytes_remaining = data->size() - bytes_to_copy;
        IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
        memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
               bytes_remaining);
        response_body_.pop_front();
        response_body_.push_front(make_scoped_refptr(new_buffer));
      }
      bytes_read += bytes_to_copy;
    }
    if (SpdySession::flow_control())
      stream_->IncreaseRecvWindowSize(bytes_read);
    return bytes_read;
  } else if (stream_->closed()) {
    return stream_->response_status();
  }

  CHECK(!user_callback_);
  CHECK(!user_buffer_);
  CHECK_EQ(0, user_buffer_len_);

  user_callback_ = callback;
  user_buffer_ = buf;
  user_buffer_len_ = buf_len;
  return ERR_IO_PENDING;
}

void SpdyHttpStream::Close(bool not_reusable) {
  // Note: the not_reusable flag has no meaning for SPDY streams.

  Cancel();
}

HttpStream* SpdyHttpStream::RenewStreamForAuth() {
  return NULL;
}

bool SpdyHttpStream::IsResponseBodyComplete() const {
  if (!stream_)
    return false;
  return stream_->closed();
}

bool SpdyHttpStream::CanFindEndOfResponse() const {
  return true;
}

bool SpdyHttpStream::IsMoreDataBuffered() const {
  return false;
}

bool SpdyHttpStream::IsConnectionReused() const {
  return spdy_session_->IsReused();
}

void SpdyHttpStream::SetConnectionReused() {
  // SPDY doesn't need an indicator here.
}

bool SpdyHttpStream::IsConnectionReusable() const {
  // SPDY streams aren't considered reusable.
  return false;
}

void SpdyHttpStream::set_chunk_callback(ChunkCallback* callback) {
  if (request_body_stream_ != NULL)
    request_body_stream_->set_chunk_callback(callback);
}

int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
                                UploadDataStream* request_body,
                                HttpResponseInfo* response,
                                CompletionCallback* callback) {
  base::Time request_time = base::Time::Now();
  CHECK(stream_.get());

  stream_->SetDelegate(this);

  linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
  CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers,
                                   headers.get(), direct_);
  stream_->set_spdy_headers(headers);

  stream_->SetRequestTime(request_time);
  // This should only get called in the case of a request occurring
  // during server push that has already begun but hasn't finished,
  // so we set the response's request time to be the actual one
  if (response_info_)
    response_info_->request_time = request_time;

  CHECK(!request_body_stream_.get());
  if (request_body) {
    if (request_body->size() || request_body->is_chunked())
      request_body_stream_.reset(request_body);
    else
      delete request_body;
  }

  CHECK(callback);
  CHECK(!stream_->cancelled());
  CHECK(response);

  if (!stream_->pushed() && stream_->closed()) {
    if (stream_->response_status() == OK)
      return ERR_FAILED;
    else
      return stream_->response_status();
  }

  // SendRequest can be called in two cases.
  //
  // a) A client initiated request. In this case, |response_info_| should be
  //    NULL to start with.
  // b) A client request which matches a response that the server has already
  //    pushed.
  if (push_response_info_.get()) {
    *response = *(push_response_info_.get());
    push_response_info_.reset();
  }
  else
    DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);

  response_info_ = response;

  // Put the peer's IP address and port into the response.
  AddressList address;
  int result = stream_->GetPeerAddress(&address);
  if (result != OK)
    return result;
  response_info_->socket_address = HostPortPair::FromAddrInfo(address.head());

  bool has_upload_data = request_body_stream_.get() != NULL;
  result = stream_->SendRequest(has_upload_data);
  if (result == ERR_IO_PENDING) {
    CHECK(!user_callback_);
    user_callback_ = callback;
  }
  return result;
}

void SpdyHttpStream::Cancel() {
  if (spdy_session_)
    spdy_session_->CancelPendingCreateStreams(&stream_);
  user_callback_ = NULL;
  if (stream_)
    stream_->Cancel();
}

bool SpdyHttpStream::OnSendHeadersComplete(int status) {
  if (user_callback_)
    DoCallback(status);
  return request_body_stream_.get() == NULL;
}

int SpdyHttpStream::OnSendBody() {
  CHECK(request_body_stream_.get());

  int buf_len = static_cast<int>(request_body_stream_->buf_len());
  if (!buf_len)
    return OK;
  bool is_chunked = request_body_stream_->is_chunked();
  // TODO(satish): For non-chunked POST data, we set DATA_FLAG_FIN for all
  // blocks of data written out. This is wrong if the POST data was larger than
  // UploadDataStream::kBufSize as that is the largest buffer that
  // UploadDataStream returns at a time and we'll be setting the FIN flag for
  // each block of data written out.
  bool eof = !is_chunked || request_body_stream_->IsOnLastChunk();
  return stream_->WriteStreamData(
      request_body_stream_->buf(), buf_len,
      eof ? spdy::DATA_FLAG_FIN : spdy::DATA_FLAG_NONE);
}

int SpdyHttpStream::OnSendBodyComplete(int status, bool* eof) {
  CHECK(request_body_stream_.get());

  request_body_stream_->MarkConsumedAndFillBuffer(status);
  *eof = request_body_stream_->eof();
  if (!*eof &&
      request_body_stream_->is_chunked() &&
      !request_body_stream_->buf_len())
    return ERR_IO_PENDING;

  return OK;
}

int SpdyHttpStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response,
                                       base::Time response_time,
                                       int status) {
  if (!response_info_) {
    DCHECK(stream_->pushed());
    push_response_info_.reset(new HttpResponseInfo);
    response_info_ = push_response_info_.get();
  }

  // If the response is already received, these headers are too late.
  if (response_headers_received_) {
    LOG(WARNING) << "SpdyHttpStream headers received after response started.";
    return OK;
  }

  // TODO(mbelshe): This is the time of all headers received, not just time
  // to first byte.
  response_info_->response_time = base::Time::Now();

  if (!SpdyHeadersToHttpResponse(response, response_info_)) {
    // We might not have complete headers yet.
    return ERR_INCOMPLETE_SPDY_HEADERS;
  }

  response_headers_received_ = true;
  // Don't store the SSLInfo in the response here, HttpNetworkTransaction
  // will take care of that part.
  SSLInfo ssl_info;
  stream_->GetSSLInfo(&ssl_info,
                      &response_info_->was_npn_negotiated);
  response_info_->request_time = stream_->GetRequestTime();
  response_info_->vary_data.Init(*request_info_, *response_info_->headers);
  // TODO(ahendrickson): This is recorded after the entire SYN_STREAM control
  // frame has been received and processed.  Move to framer?
  response_info_->response_time = response_time;

  if (user_callback_)
    DoCallback(status);
  return status;
}

void SpdyHttpStream::OnDataReceived(const char* data, int length) {
  // SpdyStream won't call us with data if the header block didn't contain a
  // valid set of headers.  So we don't expect to not have headers received
  // here.
  DCHECK(response_headers_received_);

  // Note that data may be received for a SpdyStream prior to the user calling
  // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
  // happen for server initiated streams.
  DCHECK(!stream_->closed() || stream_->pushed());
  if (length > 0) {
    // Save the received data.
    IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
    memcpy(io_buffer->data(), data, length);
    response_body_.push_back(make_scoped_refptr(io_buffer));

    if (user_buffer_) {
      // Handing small chunks of data to the caller creates measurable overhead.
      // We buffer data in short time-spans and send a single read notification.
      ScheduleBufferedReadCallback();
    }
  }
}

void SpdyHttpStream::OnDataSent(int length) {
  // For HTTP streams, no data is sent from the client while in the OPEN state,
  // so it is never called.
  NOTREACHED();
}

void SpdyHttpStream::OnClose(int status) {
  bool invoked_callback = false;
  if (status == net::OK) {
    // We need to complete any pending buffered read now.
    invoked_callback = DoBufferedReadCallback();
  }
  if (!invoked_callback && user_callback_)
    DoCallback(status);
}

void SpdyHttpStream::ScheduleBufferedReadCallback() {
  // If there is already a scheduled DoBufferedReadCallback, don't issue
  // another one.  Mark that we have received more data and return.
  if (buffered_read_callback_pending_) {
    more_read_data_pending_ = true;
    return;
  }

  more_read_data_pending_ = false;
  buffered_read_callback_pending_ = true;
  const int kBufferTimeMs = 1;
  MessageLoop::current()->PostDelayedTask(FROM_HERE, read_callback_factory_.
      NewRunnableMethod(&SpdyHttpStream::DoBufferedReadCallback),
      kBufferTimeMs);
}

// Checks to see if we should wait for more buffered data before notifying
// the caller.  Returns true if we should wait, false otherwise.
bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
  // If the response is complete, there is no point in waiting.
  if (stream_->closed())
    return false;

  int bytes_buffered = 0;
  std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it;
  for (it = response_body_.begin();
       it != response_body_.end() && bytes_buffered < user_buffer_len_;
       ++it)
    bytes_buffered += (*it)->size();

  return bytes_buffered < user_buffer_len_;
}

bool SpdyHttpStream::DoBufferedReadCallback() {
  read_callback_factory_.RevokeAll();
  buffered_read_callback_pending_ = false;

  // If the transaction is cancelled or errored out, we don't need to complete
  // the read.
  if (!stream_ || stream_->response_status() != OK || stream_->cancelled())
    return false;

  // When more_read_data_pending_ is true, it means that more data has
  // arrived since we started waiting.  Wait a little longer and continue
  // to buffer.
  if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
    ScheduleBufferedReadCallback();
    return false;
  }

  int rv = 0;
  if (user_buffer_) {
    rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
    CHECK_NE(rv, ERR_IO_PENDING);
    user_buffer_ = NULL;
    user_buffer_len_ = 0;
    DoCallback(rv);
    return true;
  }
  return false;
}

void SpdyHttpStream::DoCallback(int rv) {
  CHECK_NE(rv, ERR_IO_PENDING);
  CHECK(user_callback_);

  // Since Run may result in being called back, clear user_callback_ in advance.
  CompletionCallback* c = user_callback_;
  user_callback_ = NULL;
  c->Run(rv);
}

void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
  DCHECK(stream_);
  bool using_npn;
  stream_->GetSSLInfo(ssl_info, &using_npn);
}

void SpdyHttpStream::GetSSLCertRequestInfo(
    SSLCertRequestInfo* cert_request_info) {
  DCHECK(stream_);
  stream_->GetSSLCertRequestInfo(cert_request_info);
}

bool SpdyHttpStream::IsSpdyHttpStream() const {
  return true;
}

}  // namespace net