普通文本  |  436行  |  12.39 KB

// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/flip/flip_stream.h"

#include "base/logging.h"
#include "net/flip/flip_session.h"
#include "net/http/http_request_info.h"
#include "net/http/http_response_info.h"

namespace net {

FlipStream::FlipStream(FlipSession* session, flip::FlipStreamId stream_id,
                       bool pushed, LoadLog* log)
    : stream_id_(stream_id),
      priority_(0),
      pushed_(pushed),
      download_finished_(false),
      metrics_(Singleton<BandwidthMetrics>::get()),
      session_(session),
      response_(NULL),
      request_body_stream_(NULL),
      response_complete_(false),
      io_state_(STATE_NONE),
      response_status_(OK),
      user_callback_(NULL),
      user_buffer_(NULL),
      user_buffer_len_(0),
      cancelled_(false),
      load_log_(log),
      send_bytes_(0),
      recv_bytes_(0),
      histograms_recorded_(false) {}

FlipStream::~FlipStream() {
  DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;

  // TODO(willchan): We're still calling CancelStream() too many times, because
  // inactive pending/pushed streams will still have stream_id_ set.
  if (stream_id_) {
    session_->CancelStream(stream_id_);
  } else if (!response_complete_) {
    NOTREACHED();
  }
}

uint64 FlipStream::GetUploadProgress() const {
  if (!request_body_stream_.get())
    return 0;

  return request_body_stream_->position();
}

const HttpResponseInfo* FlipStream::GetResponseInfo() const {
  return response_;
}

int FlipStream::ReadResponseHeaders(CompletionCallback* callback) {
  // Note: The FlipStream may have already received the response headers, so
  //       this call may complete synchronously.
  CHECK(callback);
  CHECK(io_state_ == STATE_NONE);
  CHECK(!cancelled_);

  // The SYN_REPLY has already been received.
  if (response_->headers)
    return OK;

  io_state_ = STATE_READ_HEADERS;
  CHECK(!user_callback_);
  user_callback_ = callback;
  return ERR_IO_PENDING;
}

int FlipStream::ReadResponseBody(
    IOBuffer* buf, int buf_len, CompletionCallback* callback) {
  DCHECK_EQ(io_state_, STATE_NONE);
  CHECK(buf);
  CHECK(buf_len);
  CHECK(callback);
  CHECK(!cancelled_);

  // If we have data buffered, complete the IO immediately.
  if (response_body_.size()) {
    int bytes_read = 0;
    while (response_body_.size() && buf_len > 0) {
      scoped_refptr<IOBufferWithSize> data = response_body_.front();
      const int bytes_to_copy = std::min(buf_len, data->size());
      memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
      buf_len -= bytes_to_copy;
      if (bytes_to_copy == data->size()) {
        response_body_.pop_front();
      } else {
        const int bytes_remaining = data->size() - bytes_to_copy;
        IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
        memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
               bytes_remaining);
        response_body_.pop_front();
        response_body_.push_front(new_buffer);
      }
      bytes_read += bytes_to_copy;
    }
    if (bytes_read > 0)
      recv_bytes_ += bytes_read;
    return bytes_read;
  } else if (response_complete_) {
    return response_status_;
  }

  CHECK(!user_callback_);
  CHECK(!user_buffer_);
  CHECK(user_buffer_len_ == 0);

  user_callback_ = callback;
  user_buffer_ = buf;
  user_buffer_len_ = buf_len;
  return ERR_IO_PENDING;
}

int FlipStream::SendRequest(UploadDataStream* upload_data,
                            HttpResponseInfo* response,
                            CompletionCallback* callback) {
  CHECK(callback);
  CHECK(!cancelled_);
  CHECK(response);

  response_ = response;

  if (upload_data) {
    if (upload_data->size())
      request_body_stream_.reset(upload_data);
    else
      delete upload_data;
  }

  send_time_ = base::TimeTicks::Now();

  DCHECK_EQ(io_state_, STATE_NONE);
  if (!pushed_)
    io_state_ = STATE_SEND_HEADERS;
  else
    io_state_ = STATE_READ_HEADERS;
  int result = DoLoop(OK);
  if (result == ERR_IO_PENDING) {
    CHECK(!user_callback_);
    user_callback_ = callback;
  }
  return result;
}

void FlipStream::Cancel() {
  cancelled_ = true;
  user_callback_ = NULL;

  session_->CancelStream(stream_id_);
}

void FlipStream::OnResponseReceived(const HttpResponseInfo& response) {
  metrics_.StartStream();

  CHECK(!response_->headers);

  *response_ = response;  // TODO(mbelshe): avoid copy.
  DCHECK(response_->headers);

  recv_first_byte_time_ = base::TimeTicks::Now();

  if (io_state_ == STATE_NONE) {
    CHECK(pushed_);
  } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) {
    CHECK(!pushed_);
  } else {
    NOTREACHED();
  }

  int rv = DoLoop(OK);

  if (user_callback_)
    DoCallback(rv);
}

bool FlipStream::OnDataReceived(const char* data, int length) {
  DCHECK_GE(length, 0);
  LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
            << stream_id_;

  // If we don't have a response, then the SYN_REPLY did not come through.
  // We cannot pass data up to the caller unless the reply headers have been
  // received.
  if (!response_->headers) {
    OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
    return false;
  }

  if (length > 0)
    recv_bytes_ += length;
  recv_last_byte_time_ = base::TimeTicks::Now();

  // A zero-length read means that the stream is being closed.
  if (!length) {
    metrics_.StopStream();
    download_finished_ = true;
    OnClose(net::OK);
    return true;
  }

  // Track our bandwidth.
  metrics_.RecordBytes(length);

  if (length > 0) {
    // TODO(mbelshe): If read is pending, we should copy the data straight into
    // the read buffer here.  For now, we'll queue it always.
    // TODO(mbelshe): We need to have some throttling on this.  We shouldn't
    //                buffer an infinite amount of data.

    IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
    memcpy(io_buffer->data(), data, length);

    response_body_.push_back(io_buffer);
  }

  // Note that data may be received for a FlipStream prior to the user calling
  // ReadResponseBody(), therefore user_callback_ may be NULL.  This may often
  // happen for server initiated streams.
  if (user_callback_) {
    int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
    CHECK(rv != ERR_IO_PENDING);
    user_buffer_ = NULL;
    user_buffer_len_ = 0;
    DoCallback(rv);
  }

  return true;
}

void FlipStream::OnWriteComplete(int status) {
  // TODO(mbelshe): Check for cancellation here.  If we're cancelled, we
  // should discontinue the DoLoop.

  if (status > 0)
    send_bytes_ += status;

  DoLoop(status);
}

void FlipStream::OnClose(int status) {
  response_complete_ = true;
  response_status_ = status;
  stream_id_ = 0;

  if (user_callback_)
    DoCallback(status);

  UpdateHistograms();
}

int FlipStream::DoLoop(int result) {
  do {
    State state = io_state_;
    io_state_ = STATE_NONE;
    switch (state) {
      // State machine 1: Send headers and wait for response headers.
      case STATE_SEND_HEADERS:
        CHECK(result == OK);
        LoadLog::BeginEvent(load_log_,
                            LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
        result = DoSendHeaders();
        break;
      case STATE_SEND_HEADERS_COMPLETE:
        LoadLog::EndEvent(load_log_,
                          LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
        result = DoSendHeadersComplete(result);
        break;
      case STATE_SEND_BODY:
        CHECK(result == OK);
        LoadLog::BeginEvent(load_log_,
                            LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
        result = DoSendBody();
        break;
      case STATE_SEND_BODY_COMPLETE:
        LoadLog::EndEvent(load_log_,
                          LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
        result = DoSendBodyComplete(result);
        break;
      case STATE_READ_HEADERS:
        CHECK(result == OK);
        LoadLog::BeginEvent(load_log_,
                            LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
        result = DoReadHeaders();
        break;
      case STATE_READ_HEADERS_COMPLETE:
        LoadLog::EndEvent(load_log_,
                          LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
        result = DoReadHeadersComplete(result);
        break;

      // State machine 2: Read body.
      // NOTE(willchan): Currently unused.  Currently we handle this stuff in
      // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc.  Only reason
      // to do this is for consistency with the Http code.
      case STATE_READ_BODY:
        LoadLog::BeginEvent(load_log_,
                            LoadLog::TYPE_FLIP_STREAM_READ_BODY);
        result = DoReadBody();
        break;
      case STATE_READ_BODY_COMPLETE:
        LoadLog::EndEvent(load_log_,
                          LoadLog::TYPE_FLIP_STREAM_READ_BODY);
        result = DoReadBodyComplete(result);
        break;
      case STATE_DONE:
        DCHECK(result != ERR_IO_PENDING);
        break;
      default:
        NOTREACHED();
        break;
    }
  } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE);

  return result;
}

void FlipStream::DoCallback(int rv) {
  CHECK(rv != ERR_IO_PENDING);
  CHECK(user_callback_);

  // Since Run may result in being called back, clear user_callback_ in advance.
  CompletionCallback* c = user_callback_;
  user_callback_ = NULL;
  c->Run(rv);
}

int FlipStream::DoSendHeaders() {
  // The FlipSession will always call us back when the send is complete.
  // TODO(willchan): This code makes the assumption that for the non-push stream
  // case, the client code calls SendRequest() after creating the stream and
  // before yielding back to the MessageLoop.  This is true in the current code,
  // but is not obvious from the headers.  We should make the code handle
  // SendRequest() being called after the SYN_REPLY has been received.
  io_state_ = STATE_SEND_HEADERS_COMPLETE;
  return ERR_IO_PENDING;
}

int FlipStream::DoSendHeadersComplete(int result) {
  if (result < 0)
    return result;

  CHECK(result > 0);

  // There is no body, skip that state.
  if (!request_body_stream_.get()) {
    io_state_ = STATE_READ_HEADERS;
    return OK;
  }

  io_state_ = STATE_SEND_BODY;
  return OK;
}

// DoSendBody is called to send the optional body for the request.  This call
// will also be called as each write of a chunk of the body completes.
int FlipStream::DoSendBody() {
  // If we're already in the STATE_SENDING_BODY state, then we've already
  // sent a portion of the body.  In that case, we need to first consume
  // the bytes written in the body stream.  Note that the bytes written is
  // the number of bytes in the frame that were written, only consume the
  // data portion, of course.
  io_state_ = STATE_SEND_BODY_COMPLETE;
  int buf_len = static_cast<int>(request_body_stream_->buf_len());
  return session_->WriteStreamData(stream_id_,
                                   request_body_stream_->buf(),
                                   buf_len);
}

int FlipStream::DoSendBodyComplete(int result) {
  if (result < 0)
    return result;

  CHECK(result != 0);

  request_body_stream_->DidConsume(result);

  if (request_body_stream_->position() < request_body_stream_->size())
    io_state_ = STATE_SEND_BODY;
  else
    io_state_ = STATE_READ_HEADERS;

  return OK;
}

int FlipStream::DoReadHeaders() {
  io_state_ = STATE_READ_HEADERS_COMPLETE;
  return response_->headers ? OK : ERR_IO_PENDING;
}

int FlipStream::DoReadHeadersComplete(int result) {
  return result;
}

int FlipStream::DoReadBody() {
  // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
  // makes sense.
  return ERR_IO_PENDING;
}

int FlipStream::DoReadBodyComplete(int result) {
  // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
  // makes sense.
  return ERR_IO_PENDING;
}

void FlipStream::UpdateHistograms() {
  if (histograms_recorded_)
    return;

  histograms_recorded_ = true;

  // We need all timers to be filled in, otherwise metrics can be bogus.
  if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
      recv_last_byte_time_.is_null())
    return;

  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
      recv_first_byte_time_ - send_time_);
  UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
      recv_last_byte_time_ - recv_first_byte_time_);
  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
      recv_last_byte_time_ - send_time_);

  UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
  UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
}

}  // namespace net