普通文本  |  427行  |  14.95 KB

// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/quic/quic_dispatcher.h"

#include <errno.h>

#include "base/debug/stack_trace.h"
#include "base/logging.h"
#include "base/stl_util.h"
#include "net/quic/quic_blocked_writer_interface.h"
#include "net/quic/quic_connection_helper.h"
#include "net/quic/quic_flags.h"
#include "net/quic/quic_time_wait_list_manager.h"
#include "net/quic/quic_utils.h"

namespace net {

using base::StringPiece;
using std::make_pair;
using std::find;

class DeleteSessionsAlarm : public QuicAlarm::Delegate {
 public:
  explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
      : dispatcher_(dispatcher) {
  }

  virtual QuicTime OnAlarm() OVERRIDE {
    dispatcher_->DeleteSessions();
    return QuicTime::Zero();
  }

 private:
  QuicDispatcher* dispatcher_;
};

class QuicDispatcher::QuicFramerVisitor : public QuicFramerVisitorInterface {
 public:
  explicit QuicFramerVisitor(QuicDispatcher* dispatcher)
      : dispatcher_(dispatcher),
        connection_id_(0) {}

  // QuicFramerVisitorInterface implementation
  virtual void OnPacket() OVERRIDE {}
  virtual bool OnUnauthenticatedPublicHeader(
      const QuicPacketPublicHeader& header) OVERRIDE {
    connection_id_ = header.connection_id;
    return dispatcher_->OnUnauthenticatedPublicHeader(header);
  }
  virtual bool OnUnauthenticatedHeader(
      const QuicPacketHeader& header) OVERRIDE {
    dispatcher_->OnUnauthenticatedHeader(header);
    return false;
  }
  virtual void OnError(QuicFramer* framer) OVERRIDE {
    DVLOG(1) << QuicUtils::ErrorToString(framer->error());
  }

  virtual bool OnProtocolVersionMismatch(
      QuicVersion /*received_version*/) OVERRIDE {
    if (dispatcher_->time_wait_list_manager()->IsConnectionIdInTimeWait(
            connection_id_)) {
      // Keep processing after protocol mismatch - this will be dealt with by
      // the TimeWaitListManager.
      return true;
    } else {
      DLOG(DFATAL) << "Version mismatch, connection ID (" << connection_id_
                   << ") not in time wait list.";
      return false;
    }
  }

  // The following methods should never get called because we always return
  // false from OnUnauthenticatedHeader().  As a result, we never process the
  // payload of the packet.
  virtual void OnPublicResetPacket(
      const QuicPublicResetPacket& /*packet*/) OVERRIDE {
    DCHECK(false);
  }
  virtual void OnVersionNegotiationPacket(
      const QuicVersionNegotiationPacket& /*packet*/) OVERRIDE {
    DCHECK(false);
  }
  virtual void OnDecryptedPacket(EncryptionLevel level) OVERRIDE {
    DCHECK(false);
  }
  virtual bool OnPacketHeader(const QuicPacketHeader& /*header*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual void OnRevivedPacket() OVERRIDE {
    DCHECK(false);
  }
  virtual void OnFecProtectedPayload(StringPiece /*payload*/) OVERRIDE {
    DCHECK(false);
  }
  virtual bool OnStreamFrame(const QuicStreamFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnAckFrame(const QuicAckFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnCongestionFeedbackFrame(
      const QuicCongestionFeedbackFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnStopWaitingFrame(
      const QuicStopWaitingFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnPingFrame(const QuicPingFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnRstStreamFrame(const QuicRstStreamFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnConnectionCloseFrame(
      const QuicConnectionCloseFrame & /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnGoAwayFrame(const QuicGoAwayFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnWindowUpdateFrame(const QuicWindowUpdateFrame& /*frame*/)
      OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnBlockedFrame(const QuicBlockedFrame& frame) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual void OnFecData(const QuicFecData& /*fec*/) OVERRIDE {
    DCHECK(false);
  }
  virtual void OnPacketComplete() OVERRIDE {
    DCHECK(false);
  }

 private:
  QuicDispatcher* dispatcher_;

  // Latched in OnUnauthenticatedPublicHeader for use later.
  QuicConnectionId connection_id_;
};

QuicDispatcher::QuicDispatcher(const QuicConfig& config,
                               const QuicCryptoServerConfig& crypto_config,
                               const QuicVersionVector& supported_versions,
                               QuicConnectionHelperInterface* helper)
    : config_(config),
      crypto_config_(crypto_config),
      helper_(helper),
      delete_sessions_alarm_(
          helper_->CreateAlarm(new DeleteSessionsAlarm(this))),
      supported_versions_(supported_versions),
      supported_versions_no_flow_control_(supported_versions),
      supported_versions_no_connection_flow_control_(supported_versions),
      current_packet_(NULL),
      framer_(supported_versions, /*unused*/ QuicTime::Zero(), true),
      framer_visitor_(new QuicFramerVisitor(this)) {
  framer_.set_visitor(framer_visitor_.get());
}

QuicDispatcher::~QuicDispatcher() {
  STLDeleteValues(&session_map_);
  STLDeleteElements(&closed_session_list_);
}

void QuicDispatcher::Initialize(QuicPacketWriter* writer) {
  DCHECK(writer_ == NULL);
  writer_.reset(writer);
  time_wait_list_manager_.reset(CreateQuicTimeWaitListManager());

  // Remove all versions > QUIC_VERSION_16 from the
  // supported_versions_no_flow_control_ vector.
  QuicVersionVector::iterator it =
      find(supported_versions_no_flow_control_.begin(),
           supported_versions_no_flow_control_.end(), QUIC_VERSION_17);
  if (it != supported_versions_no_flow_control_.end()) {
    supported_versions_no_flow_control_.erase(
        supported_versions_no_flow_control_.begin(), it + 1);
  }
  CHECK(!supported_versions_no_flow_control_.empty());

  // Remove all versions > QUIC_VERSION_18 from the
  // supported_versions_no_connection_flow_control_ vector.
  QuicVersionVector::iterator connection_it =
      find(supported_versions_no_connection_flow_control_.begin(),
           supported_versions_no_connection_flow_control_.end(),
           QUIC_VERSION_19);
  if (connection_it != supported_versions_no_connection_flow_control_.end()) {
    supported_versions_no_connection_flow_control_.erase(
        supported_versions_no_connection_flow_control_.begin(),
        connection_it + 1);
  }
  CHECK(!supported_versions_no_connection_flow_control_.empty());
}

void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
                                   const IPEndPoint& client_address,
                                   const QuicEncryptedPacket& packet) {
  current_server_address_ = server_address;
  current_client_address_ = client_address;
  current_packet_ = &packet;
  // ProcessPacket will cause the packet to be dispatched in
  // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
  // in OnAuthenticatedHeader.
  framer_.ProcessPacket(packet);
  // TODO(rjshade): Return a status describing if/why a packet was dropped,
  //                and log somehow.  Maybe expose as a varz.
}

bool QuicDispatcher::OnUnauthenticatedPublicHeader(
    const QuicPacketPublicHeader& header) {
  QuicSession* session = NULL;

  QuicConnectionId connection_id = header.connection_id;
  SessionMap::iterator it = session_map_.find(connection_id);
  if (it == session_map_.end()) {
    if (header.reset_flag) {
      return false;
    }
    if (time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id)) {
      return HandlePacketForTimeWait(header);
    }

    // Ensure the packet has a version negotiation bit set before creating a new
    // session for it.  All initial packets for a new connection are required to
    // have the flag set.  Otherwise it may be a stray packet.
    if (header.version_flag) {
      session = CreateQuicSession(connection_id, current_server_address_,
                                  current_client_address_);
    }

    if (session == NULL) {
      DVLOG(1) << "Failed to create session for " << connection_id;
      // Add this connection_id fo the time-wait state, to safely reject future
      // packets.

      if (header.version_flag &&
          !framer_.IsSupportedVersion(header.versions.front())) {
        // TODO(ianswett): Produce a no-version version negotiation packet.
        return false;
      }

      // Use the version in the packet if possible, otherwise assume the latest.
      QuicVersion version = header.version_flag ? header.versions.front() :
          supported_versions_.front();
      time_wait_list_manager_->AddConnectionIdToTimeWait(
          connection_id, version, NULL);
      DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id));
      return HandlePacketForTimeWait(header);
    }
    DVLOG(1) << "Created new session for " << connection_id;
    session_map_.insert(make_pair(connection_id, session));
  } else {
    session = it->second;
  }

  session->connection()->ProcessUdpPacket(
      current_server_address_, current_client_address_, *current_packet_);

  // Do not parse the packet further.  The session will process it completely.
  return false;
}

void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader& header) {
  DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(
      header.public_header.connection_id));
  time_wait_list_manager_->ProcessPacket(current_server_address_,
                                         current_client_address_,
                                         header.public_header.connection_id,
                                         header.packet_sequence_number,
                                         *current_packet_);
}

void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
  QuicConnection* connection = it->second->connection();
  QuicEncryptedPacket* connection_close_packet =
      connection->ReleaseConnectionClosePacket();
  write_blocked_list_.erase(connection);
  time_wait_list_manager_->AddConnectionIdToTimeWait(it->first,
                                                     connection->version(),
                                                     connection_close_packet);
  session_map_.erase(it);
}

void QuicDispatcher::DeleteSessions() {
  STLDeleteElements(&closed_session_list_);
}

void QuicDispatcher::OnCanWrite() {
  // We got an EPOLLOUT: the socket should not be blocked.
  writer_->SetWritable();

  // Give each writer one attempt to write.
  int num_writers = write_blocked_list_.size();
  for (int i = 0; i < num_writers; ++i) {
    if (write_blocked_list_.empty()) {
      return;
    }
    QuicBlockedWriterInterface* blocked_writer =
        write_blocked_list_.begin()->first;
    write_blocked_list_.erase(write_blocked_list_.begin());
    blocked_writer->OnCanWrite();
    if (writer_->IsWriteBlocked()) {
      // We were unable to write.  Wait for the next EPOLLOUT. The writer is
      // responsible for adding itself to the blocked list via OnWriteBlocked().
      return;
    }
  }
}

bool QuicDispatcher::HasPendingWrites() const {
  return !write_blocked_list_.empty();
}

void QuicDispatcher::Shutdown() {
  while (!session_map_.empty()) {
    QuicSession* session = session_map_.begin()->second;
    session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
    // Validate that the session removes itself from the session map on close.
    DCHECK(session_map_.empty() || session_map_.begin()->second != session);
  }
  DeleteSessions();
}

void QuicDispatcher::OnConnectionClosed(QuicConnectionId connection_id,
                                        QuicErrorCode error) {
  SessionMap::iterator it = session_map_.find(connection_id);
  if (it == session_map_.end()) {
    LOG(DFATAL) << "ConnectionId " << connection_id
                << " does not exist in the session map.  "
                << "Error: " << QuicUtils::ErrorToString(error);
    LOG(DFATAL) << base::debug::StackTrace().ToString();
    return;
  }
  DVLOG_IF(1, error != QUIC_NO_ERROR) << "Closing connection ("
                                      << connection_id
                                      << ") due to error: "
                                      << QuicUtils::ErrorToString(error);
  if (closed_session_list_.empty()) {
    delete_sessions_alarm_->Set(helper_->GetClock()->ApproximateNow());
  }
  closed_session_list_.push_back(it->second);
  CleanUpSession(it);
}

void QuicDispatcher::OnWriteBlocked(QuicBlockedWriterInterface* writer) {
  DCHECK(writer_->IsWriteBlocked());
  write_blocked_list_.insert(make_pair(writer, true));
}

QuicSession* QuicDispatcher::CreateQuicSession(
    QuicConnectionId connection_id,
    const IPEndPoint& server_address,
    const IPEndPoint& client_address) {
  QuicServerSession* session = new QuicServerSession(
      config_,
      CreateQuicConnection(connection_id, server_address, client_address),
      this);
  session->InitializeSession(crypto_config_);
  return session;
}

QuicConnection* QuicDispatcher::CreateQuicConnection(
    QuicConnectionId connection_id,
    const IPEndPoint& server_address,
    const IPEndPoint& client_address) {
  if (FLAGS_enable_quic_stream_flow_control_2 &&
      FLAGS_enable_quic_connection_flow_control_2) {
    DVLOG(1) << "Creating QuicDispatcher with all versions.";
    return new QuicConnection(connection_id, client_address, helper_,
                              writer_.get(), true, supported_versions_);
  }

  if (FLAGS_enable_quic_stream_flow_control_2 &&
      !FLAGS_enable_quic_connection_flow_control_2) {
    DVLOG(1) << "Connection flow control disabled, creating QuicDispatcher "
             << "WITHOUT version 19 or higher.";
    return new QuicConnection(connection_id, client_address, helper_,
                              writer_.get(), true,
                              supported_versions_no_connection_flow_control_);
  }

  DVLOG(1) << "Flow control disabled, creating QuicDispatcher WITHOUT "
           << "version 17 or higher.";
  return new QuicConnection(connection_id, client_address, helper_,
                            writer_.get(), true,
                            supported_versions_no_flow_control_);
}

QuicTimeWaitListManager* QuicDispatcher::CreateQuicTimeWaitListManager() {
  return new QuicTimeWaitListManager(
      writer_.get(), this, helper_, supported_versions());
}

bool QuicDispatcher::HandlePacketForTimeWait(
    const QuicPacketPublicHeader& header) {
  if (header.reset_flag) {
    // Public reset packets do not have sequence numbers, so ignore the packet.
    return false;
  }

  // Switch the framer to the correct version, so that the sequence number can
  // be parsed correctly.
  framer_.set_version(time_wait_list_manager_->GetQuicVersionFromConnectionId(
      header.connection_id));

  // Continue parsing the packet to extract the sequence number.  Then
  // send it to the time wait manager in OnUnathenticatedHeader.
  return true;
}

}  // namespace net