普通文本  |  604行  |  18.07 KB

// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/spdy/spdy_stream.h"

#include "base/logging.h"
#include "base/message_loop.h"
#include "base/values.h"
#include "net/spdy/spdy_session.h"

namespace net {

namespace {

class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters {
 public:
  NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,
                                        int delta,
                                        int window_size)
      : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
  virtual Value* ToValue() const {
    DictionaryValue* dict = new DictionaryValue();
    dict->SetInteger("id", static_cast<int>(stream_id_));
    dict->SetInteger("delta", delta_);
    dict->SetInteger("window_size", window_size_);
    return dict;
  }
 private:
  const spdy::SpdyStreamId stream_id_;
  const int delta_;
  const int window_size_;
  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter);
};

}

SpdyStream::SpdyStream(SpdySession* session,
                       spdy::SpdyStreamId stream_id,
                       bool pushed,
                       const BoundNetLog& net_log)
    : continue_buffering_data_(true),
      stream_id_(stream_id),
      priority_(0),
      stalled_by_flow_control_(false),
      send_window_size_(spdy::kSpdyStreamInitialWindowSize),
      recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
      pushed_(pushed),
      response_received_(false),
      session_(session),
      delegate_(NULL),
      request_time_(base::Time::Now()),
      response_(new spdy::SpdyHeaderBlock),
      io_state_(STATE_NONE),
      response_status_(OK),
      cancelled_(false),
      has_upload_data_(false),
      net_log_(net_log),
      send_bytes_(0),
      recv_bytes_(0) {
}

SpdyStream::~SpdyStream() {
  UpdateHistograms();
}

void SpdyStream::SetDelegate(Delegate* delegate) {
  CHECK(delegate);
  delegate_ = delegate;

  if (pushed_) {
    CHECK(response_received());
    MessageLoop::current()->PostTask(
        FROM_HERE, NewRunnableMethod(this,
                                     &SpdyStream::PushedStreamReplayData));
  } else {
    continue_buffering_data_ = false;
  }
}

void SpdyStream::PushedStreamReplayData() {
  if (cancelled_ || !delegate_)
    return;

  continue_buffering_data_ = false;

  int rv = delegate_->OnResponseReceived(*response_, response_time_, OK);
  if (rv == ERR_INCOMPLETE_SPDY_HEADERS) {
    // We don't have complete headers.  Assume we're waiting for another
    // HEADERS frame.  Since we don't have headers, we had better not have
    // any pending data frames.
    DCHECK_EQ(0U, pending_buffers_.size());
    return;
  }

  std::vector<scoped_refptr<IOBufferWithSize> > buffers;
  buffers.swap(pending_buffers_);
  for (size_t i = 0; i < buffers.size(); ++i) {
    // It is always possible that a callback to the delegate results in
    // the delegate no longer being available.
    if (!delegate_)
      break;
    if (buffers[i]) {
      delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
    } else {
      delegate_->OnDataReceived(NULL, 0);
      session_->CloseStream(stream_id_, net::OK);
      // Note: |this| may be deleted after calling CloseStream.
      DCHECK_EQ(buffers.size() - 1, i);
    }
  }
}

void SpdyStream::DetachDelegate() {
  if (delegate_)
    delegate_->set_chunk_callback(NULL);
  delegate_ = NULL;
  if (!closed())
    Cancel();
}

const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const {
  return request_;
}

void SpdyStream::set_spdy_headers(
    const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
  request_ = headers;
}

void SpdyStream::IncreaseSendWindowSize(int delta_window_size) {
  DCHECK_GE(delta_window_size, 1);
  int new_window_size = send_window_size_ + delta_window_size;

  // We should ignore WINDOW_UPDATEs received before or after this state,
  // since before means we've not written SYN_STREAM yet (i.e. it's too
  // early) and after means we've written a DATA frame with FIN bit.
  if (io_state_ != STATE_SEND_BODY_COMPLETE)
    return;

  // it's valid for send_window_size_ to become negative (via an incoming
  // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make
  // it positive; however, if send_window_size_ is positive and incoming
  // WINDOW_UPDATE makes it negative, we have an overflow.
  if (send_window_size_ > 0 && new_window_size < 0) {
    LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size
                 << "] for stream " << stream_id_
                 << " overflows send_window_size_ [current:"
                 << send_window_size_ << "]";
    session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
    return;
  }

  send_window_size_ = new_window_size;

  net_log_.AddEvent(
      NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
          stream_id_, delta_window_size, send_window_size_)));
  if (stalled_by_flow_control_) {
    stalled_by_flow_control_ = false;
    io_state_ = STATE_SEND_BODY;
    DoLoop(OK);
  }
}

void SpdyStream::DecreaseSendWindowSize(int delta_window_size) {
  // we only call this method when sending a frame, therefore
  // |delta_window_size| should be within the valid frame size range.
  DCHECK_GE(delta_window_size, 1);
  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);

  // |send_window_size_| should have been at least |delta_window_size| for
  // this call to happen.
  DCHECK_GE(send_window_size_, delta_window_size);

  send_window_size_ -= delta_window_size;

  net_log_.AddEvent(
      NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
          stream_id_, -delta_window_size, send_window_size_)));
}

void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) {
  DCHECK_GE(delta_window_size, 1);
  // By the time a read is isued, stream may become inactive.
  if (!session_->IsStreamActive(stream_id_))
    return;
  int new_window_size = recv_window_size_ + delta_window_size;
  if (recv_window_size_ > 0)
    DCHECK(new_window_size > 0);

  recv_window_size_ = new_window_size;
  net_log_.AddEvent(
      NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
          stream_id_, delta_window_size, recv_window_size_)));
  session_->SendWindowUpdate(stream_id_, delta_window_size);
}

void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) {
  DCHECK_GE(delta_window_size, 1);

  recv_window_size_ -= delta_window_size;
  net_log_.AddEvent(
      NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
          stream_id_, -delta_window_size, recv_window_size_)));

  // Since we never decrease the initial window size, we should never hit
  // a negative |recv_window_size_|, if we do, it's a flow-control violation.
  if (recv_window_size_ < 0)
    session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
}

int SpdyStream::GetPeerAddress(AddressList* address) const {
  return session_->GetPeerAddress(address);
}

int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
  return session_->GetLocalAddress(address);
}

bool SpdyStream::WasEverUsed() const {
  return session_->WasEverUsed();
}

base::Time SpdyStream::GetRequestTime() const {
  return request_time_;
}

void SpdyStream::SetRequestTime(base::Time t) {
  request_time_ = t;
}

int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) {
  int rv = OK;

  metrics_.StartStream();

  DCHECK(response_->empty());
  *response_ = response;  // TODO(ukai): avoid copy.

  recv_first_byte_time_ = base::TimeTicks::Now();
  response_time_ = base::Time::Now();

  // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then
  // the server has sent the SYN_REPLY too early.
  if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE)
    return ERR_SPDY_PROTOCOL_ERROR;
  if (pushed_)
    CHECK(io_state_ == STATE_NONE);
  io_state_ = STATE_OPEN;

  if (delegate_)
    rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
  // If delegate_ is not yet attached, we'll call OnResponseReceived after the
  // delegate gets attached to the stream.

  return rv;
}

int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) {
  DCHECK(!response_->empty());

  // Append all the headers into the response header block.
  for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin();
      it != headers.end(); ++it) {
    // Disallow duplicate headers.  This is just to be conservative.
    if ((*response_).find(it->first) != (*response_).end()) {
      LOG(WARNING) << "HEADERS duplicate header";
      response_status_ = ERR_SPDY_PROTOCOL_ERROR;
      return ERR_SPDY_PROTOCOL_ERROR;
    }

    (*response_)[it->first] = it->second;
  }

  int rv = OK;
  if (delegate_) {
    rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
    // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
    // headers before the response header block is complete.
    if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
      rv = OK;
  }
  return rv;
}

void SpdyStream::OnDataReceived(const char* data, int length) {
  DCHECK_GE(length, 0);

  // If we don't have a response, then the SYN_REPLY did not come through.
  // We cannot pass data up to the caller unless the reply headers have been
  // received.
  if (!response_received()) {
    session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
    return;
  }

  if (!delegate_ || continue_buffering_data_) {
    // It should be valid for this to happen in the server push case.
    // We'll return received data when delegate gets attached to the stream.
    if (length > 0) {
      IOBufferWithSize* buf = new IOBufferWithSize(length);
      memcpy(buf->data(), data, length);
      pending_buffers_.push_back(make_scoped_refptr(buf));
    } else {
      pending_buffers_.push_back(NULL);
      metrics_.StopStream();
      // Note: we leave the stream open in the session until the stream
      //       is claimed.
    }
    return;
  }

  CHECK(!closed());

  // A zero-length read means that the stream is being closed.
  if (!length) {
    metrics_.StopStream();
    session_->CloseStream(stream_id_, net::OK);
    // Note: |this| may be deleted after calling CloseStream.
    return;
  }

  if (session_->flow_control())
    DecreaseRecvWindowSize(length);

  // Track our bandwidth.
  metrics_.RecordBytes(length);
  recv_bytes_ += length;
  recv_last_byte_time_ = base::TimeTicks::Now();

  if (!delegate_) {
    // It should be valid for this to happen in the server push case.
    // We'll return received data when delegate gets attached to the stream.
    IOBufferWithSize* buf = new IOBufferWithSize(length);
    memcpy(buf->data(), data, length);
    pending_buffers_.push_back(make_scoped_refptr(buf));
    return;
  }

  delegate_->OnDataReceived(data, length);
}

// This function is only called when an entire frame is written.
void SpdyStream::OnWriteComplete(int bytes) {
  DCHECK_LE(0, bytes);
  send_bytes_ += bytes;
  if (cancelled() || closed())
    return;
  DoLoop(bytes);
}

void SpdyStream::OnChunkAvailable() {
  DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY ||
         io_state_ == STATE_SEND_BODY_COMPLETE);
  if (io_state_ == STATE_SEND_BODY)
    OnWriteComplete(0);
}

void SpdyStream::OnClose(int status) {
  io_state_ = STATE_DONE;
  response_status_ = status;
  Delegate* delegate = delegate_;
  delegate_ = NULL;
  if (delegate) {
    delegate->set_chunk_callback(NULL);
    delegate->OnClose(status);
  }
}

void SpdyStream::Cancel() {
  if (cancelled())
    return;

  cancelled_ = true;
  if (session_->IsStreamActive(stream_id_))
    session_->ResetStream(stream_id_, spdy::CANCEL);
}

int SpdyStream::SendRequest(bool has_upload_data) {
  if (delegate_)
    delegate_->set_chunk_callback(this);

  // Pushed streams do not send any data, and should always be in STATE_OPEN or
  // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push
  // behavior.
  has_upload_data_ = has_upload_data;
  if (pushed_) {
    send_time_ = base::TimeTicks::Now();
    DCHECK(!has_upload_data_);
    DCHECK(response_received());
    return ERR_IO_PENDING;
  }
  CHECK_EQ(STATE_NONE, io_state_);
  io_state_ = STATE_SEND_HEADERS;
  return DoLoop(OK);
}

int SpdyStream::WriteStreamData(IOBuffer* data, int length,
                                spdy::SpdyDataFlags flags) {
  return session_->WriteStreamData(stream_id_, data, length, flags);
}

bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
  return session_->GetSSLInfo(ssl_info, was_npn_negotiated);
}

bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
  return session_->GetSSLCertRequestInfo(cert_request_info);
}

bool SpdyStream::HasUrl() const {
  if (pushed_)
    return response_received();
  return request_.get() != NULL;
}

GURL SpdyStream::GetUrl() const {
  DCHECK(HasUrl());

  if (pushed_) {
    // assemble from the response
    std::string url;
    spdy::SpdyHeaderBlock::const_iterator it;
    it = response_->find("url");
    if (it != (*response_).end())
      url = it->second;
    return GURL(url);
  }

  // assemble from the request
  std::string scheme;
  std::string host_port;
  std::string path;
  spdy::SpdyHeaderBlock::const_iterator it;
  it = request_->find("scheme");
  if (it != (*request_).end())
    scheme = it->second;
  it = request_->find("host");
  if (it != (*request_).end())
    host_port = it->second;
  it = request_->find("path");
  if (it != (*request_).end())
    path = it->second;
  std::string url = scheme + "://" + host_port + path;
  return GURL(url);
}

int SpdyStream::DoLoop(int result) {
  do {
    State state = io_state_;
    io_state_ = STATE_NONE;
    switch (state) {
      // State machine 1: Send headers and body.
      case STATE_SEND_HEADERS:
        CHECK_EQ(OK, result);
        result = DoSendHeaders();
        break;
      case STATE_SEND_HEADERS_COMPLETE:
        result = DoSendHeadersComplete(result);
        break;
      case STATE_SEND_BODY:
        CHECK_EQ(OK, result);
        result = DoSendBody();
        break;
      case STATE_SEND_BODY_COMPLETE:
        result = DoSendBodyComplete(result);
        break;
      // This is an intermediary waiting state. This state is reached when all
      // data has been sent, but no data has been received.
      case STATE_WAITING_FOR_RESPONSE:
        io_state_ = STATE_WAITING_FOR_RESPONSE;
        result = ERR_IO_PENDING;
        break;
      // State machine 2: connection is established.
      // In STATE_OPEN, OnResponseReceived has already been called.
      // OnDataReceived, OnClose and OnWriteCompelte can be called.
      // Only OnWriteCompletee calls DoLoop(().
      //
      // For HTTP streams, no data is sent from the client while in the OPEN
      // state, so OnWriteComplete is never called here.  The HTTP body is
      // handled in the OnDataReceived callback, which does not call into
      // DoLoop.
      //
      // For WebSocket streams, which are bi-directional, we'll send and
      // receive data once the connection is established.  Received data is
      // handled in OnDataReceived.  Sent data is handled in OnWriteComplete,
      // which calls DoOpen().
      case STATE_OPEN:
        result = DoOpen(result);
        break;

      case STATE_DONE:
        DCHECK(result != ERR_IO_PENDING);
        break;
      default:
        NOTREACHED() << io_state_;
        break;
    }
  } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
           io_state_ != STATE_OPEN);

  return result;
}

int SpdyStream::DoSendHeaders() {
  CHECK(!cancelled_);

  spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE;
  if (!has_upload_data_)
    flags = spdy::CONTROL_FLAG_FIN;

  CHECK(request_.get());
  int result = session_->WriteSynStream(
      stream_id_, static_cast<RequestPriority>(priority_), flags,
      request_);
  if (result != ERR_IO_PENDING)
    return result;

  send_time_ = base::TimeTicks::Now();
  io_state_ = STATE_SEND_HEADERS_COMPLETE;
  return ERR_IO_PENDING;
}

int SpdyStream::DoSendHeadersComplete(int result) {
  if (result < 0)
    return result;

  CHECK_GT(result, 0);

  if (!delegate_)
    return ERR_UNEXPECTED;

  // There is no body, skip that state.
  if (delegate_->OnSendHeadersComplete(result)) {
    io_state_ = STATE_WAITING_FOR_RESPONSE;
    return OK;
  }

  io_state_ = STATE_SEND_BODY;
  return OK;
}

// DoSendBody is called to send the optional body for the request.  This call
// will also be called as each write of a chunk of the body completes.
int SpdyStream::DoSendBody() {
  // If we're already in the STATE_SENDING_BODY state, then we've already
  // sent a portion of the body.  In that case, we need to first consume
  // the bytes written in the body stream.  Note that the bytes written is
  // the number of bytes in the frame that were written, only consume the
  // data portion, of course.
  io_state_ = STATE_SEND_BODY_COMPLETE;
  if (!delegate_)
    return ERR_UNEXPECTED;
  return delegate_->OnSendBody();
}

int SpdyStream::DoSendBodyComplete(int result) {
  if (result < 0)
    return result;

  if (!delegate_)
    return ERR_UNEXPECTED;

  bool eof = false;
  result = delegate_->OnSendBodyComplete(result, &eof);
  if (!eof)
    io_state_ = STATE_SEND_BODY;
  else
    io_state_ = STATE_WAITING_FOR_RESPONSE;

  return result;
}

int SpdyStream::DoOpen(int result) {
  if (delegate_)
    delegate_->OnDataSent(result);
  io_state_ = STATE_OPEN;
  return result;
}

void SpdyStream::UpdateHistograms() {
  // We need all timers to be filled in, otherwise metrics can be bogus.
  if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
      recv_last_byte_time_.is_null())
    return;

  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
      recv_first_byte_time_ - send_time_);
  UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
      recv_last_byte_time_ - recv_first_byte_time_);
  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
      recv_last_byte_time_ - send_time_);

  UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
  UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
}

}  // namespace net