// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/tools/quic/quic_dispatcher.h"
#include <errno.h>
#include "base/logging.h"
#include "base/stl_util.h"
#include "net/quic/quic_blocked_writer_interface.h"
#include "net/quic/quic_utils.h"
#include "net/tools/quic/quic_default_packet_writer.h"
#include "net/tools/quic/quic_epoll_connection_helper.h"
#include "net/tools/quic/quic_socket_utils.h"
namespace net {
namespace tools {
using std::make_pair;
class DeleteSessionsAlarm : public EpollAlarm {
public:
explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
: dispatcher_(dispatcher) {
}
virtual int64 OnAlarm() OVERRIDE {
EpollAlarm::OnAlarm();
dispatcher_->DeleteSessions();
return 0;
}
private:
QuicDispatcher* dispatcher_;
};
QuicDispatcher::QuicDispatcher(const QuicConfig& config,
const QuicCryptoServerConfig& crypto_config,
const QuicVersionVector& supported_versions,
int fd,
EpollServer* epoll_server)
: config_(config),
crypto_config_(crypto_config),
time_wait_list_manager_(
new QuicTimeWaitListManager(this, epoll_server, supported_versions)),
delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
epoll_server_(epoll_server),
fd_(fd),
write_blocked_(false),
helper_(new QuicEpollConnectionHelper(epoll_server_)),
writer_(new QuicDefaultPacketWriter(fd)),
supported_versions_(supported_versions) {
}
QuicDispatcher::~QuicDispatcher() {
STLDeleteValues(&session_map_);
STLDeleteElements(&closed_session_list_);
}
void QuicDispatcher::set_fd(int fd) {
fd_ = fd;
writer_.reset(new QuicDefaultPacketWriter(fd));
}
WriteResult QuicDispatcher::WritePacket(const char* buffer, size_t buf_len,
const IPAddressNumber& self_address,
const IPEndPoint& peer_address,
QuicBlockedWriterInterface* writer) {
if (write_blocked_) {
write_blocked_list_.insert(make_pair(writer, true));
return WriteResult(WRITE_STATUS_BLOCKED, EAGAIN);
}
WriteResult result =
writer_->WritePacket(buffer, buf_len, self_address, peer_address, writer);
if (result.status == WRITE_STATUS_BLOCKED) {
write_blocked_list_.insert(make_pair(writer, true));
write_blocked_ = true;
}
return result;
}
bool QuicDispatcher::IsWriteBlockedDataBuffered() const {
return writer_->IsWriteBlockedDataBuffered();
}
void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
const IPEndPoint& client_address,
QuicGuid guid,
bool has_version_flag,
const QuicEncryptedPacket& packet) {
QuicSession* session = NULL;
SessionMap::iterator it = session_map_.find(guid);
if (it == session_map_.end()) {
if (time_wait_list_manager_->IsGuidInTimeWait(guid)) {
time_wait_list_manager_->ProcessPacket(server_address,
client_address,
guid,
packet);
return;
}
// Ensure the packet has a version negotiation bit set before creating a new
// session for it. All initial packets for a new connection are required to
// have the flag set. Otherwise it may be a stray packet.
if (has_version_flag) {
session = CreateQuicSession(guid, server_address, client_address);
}
if (session == NULL) {
DLOG(INFO) << "Failed to create session for " << guid;
// Add this guid fo the time-wait state, to safely reject future packets.
// We don't know the version here, so assume latest.
// TODO(ianswett): Produce a no-version version negotiation packet.
time_wait_list_manager_->AddGuidToTimeWait(guid,
supported_versions_.front(),
NULL);
time_wait_list_manager_->ProcessPacket(server_address,
client_address,
guid,
packet);
return;
}
DLOG(INFO) << "Created new session for " << guid;
session_map_.insert(make_pair(guid, session));
} else {
session = it->second;
}
session->connection()->ProcessUdpPacket(
server_address, client_address, packet);
}
void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
QuicConnection* connection = it->second->connection();
QuicEncryptedPacket* connection_close_packet =
connection->ReleaseConnectionClosePacket();
write_blocked_list_.erase(connection);
time_wait_list_manager_->AddGuidToTimeWait(it->first,
connection->version(),
connection_close_packet);
session_map_.erase(it);
}
void QuicDispatcher::DeleteSessions() {
STLDeleteElements(&closed_session_list_);
}
void QuicDispatcher::UseWriter(QuicPacketWriter* writer) {
writer_.reset(writer);
}
bool QuicDispatcher::OnCanWrite() {
// We got an EPOLLOUT: the socket should not be blocked.
write_blocked_ = false;
// Give each writer one attempt to write.
int num_writers = write_blocked_list_.size();
for (int i = 0; i < num_writers; ++i) {
if (write_blocked_list_.empty()) {
break;
}
QuicBlockedWriterInterface* writer = write_blocked_list_.begin()->first;
write_blocked_list_.erase(write_blocked_list_.begin());
bool can_write_more = writer->OnCanWrite();
if (write_blocked_) {
// We were unable to write. Wait for the next EPOLLOUT.
// In this case, the session would have been added to the blocked list
// up in WritePacket.
return false;
}
// The socket is not blocked but the writer has ceded work. Add it to the
// end of the list.
if (can_write_more) {
write_blocked_list_.insert(make_pair(writer, true));
}
}
// We're not write blocked. Return true if there's more work to do.
return !write_blocked_list_.empty();
}
void QuicDispatcher::Shutdown() {
while (!session_map_.empty()) {
QuicSession* session = session_map_.begin()->second;
session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
// Validate that the session removes itself from the session map on close.
DCHECK(session_map_.empty() || session_map_.begin()->second != session);
}
DeleteSessions();
}
void QuicDispatcher::OnConnectionClosed(QuicGuid guid, QuicErrorCode error) {
SessionMap::iterator it = session_map_.find(guid);
if (it == session_map_.end()) {
LOG(DFATAL) << "GUID " << guid << " does not exist in the session map. "
<< "Error: " << QuicUtils::ErrorToString(error);
return;
}
DLOG_IF(INFO, error != QUIC_NO_ERROR) << "Closing connection (" << guid
<< ") due to error: "
<< QuicUtils::ErrorToString(error);
if (closed_session_list_.empty()) {
epoll_server_->RegisterAlarmApproximateDelta(
0, delete_sessions_alarm_.get());
}
closed_session_list_.push_back(it->second);
CleanUpSession(it);
}
QuicSession* QuicDispatcher::CreateQuicSession(
QuicGuid guid,
const IPEndPoint& server_address,
const IPEndPoint& client_address) {
QuicServerSession* session = new QuicServerSession(
config_, new QuicConnection(guid, client_address, helper_.get(), this,
true, supported_versions_), this);
session->InitializeSession(crypto_config_);
return session;
}
} // namespace tools
} // namespace net