/*
* Copyright (c) 2011-2015, Intel Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "RemoteProcessorServer.h"
#include <iostream>
#include <memory>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include "RequestMessage.h"
#include "AnswerMessage.h"
#include "RemoteCommandHandler.h"
#include "Socket.h"
#include "convert.hpp"
using std::string;
CRemoteProcessorServer::CRemoteProcessorServer(std::string bindAddress)
: _bindAddress(bindAddress), _io_service(), _acceptor(_io_service), _socket(_io_service)
{
}
CRemoteProcessorServer::~CRemoteProcessorServer()
{
stop();
}
// State
bool CRemoteProcessorServer::start(string &error)
{
using namespace asio;
try {
generic::stream_protocol::endpoint endpoint;
uint16_t port;
std::string endpointName;
bool isInet;
// For backward compatibility, tcp port referred by its value only
if (convertTo(_bindAddress, port)) {
isInet = true;
} else {
// required form is <protocol>://<host:port|port_name>
const std::string tcpProtocol{"tcp"};
const std::string unixProtocol{"unix"};
const std::vector<std::string> supportedProtocols{ tcpProtocol, unixProtocol };
const std::string protocolDel{"://"};
size_t protocolDelPos = _bindAddress.find(protocolDel);
if (protocolDelPos == std::string::npos) {
error = "bindaddress " + _bindAddress + " ill formed, missing " + protocolDel;
return false;
}
std::string protocol = _bindAddress.substr(0, protocolDelPos);
if (std::find(begin(supportedProtocols), end(supportedProtocols), protocol) ==
end(supportedProtocols)) {
error = "bindaddress " + _bindAddress + " has invalid protocol " + protocol;
return false;
}
isInet = (_bindAddress.find(tcpProtocol) != std::string::npos);
if (isInet) {
size_t portDelPos = _bindAddress.find(':', protocolDelPos + protocolDel.size());
if (portDelPos == std::string::npos) {
error = "bindaddress " + _bindAddress + " ill formed, missing " + ":";
return false;
}
std::string portLiteral{_bindAddress.substr(portDelPos + 1)};
if (!convertTo(portLiteral, port)) {
error = "bindaddress " + _bindAddress + " port " + portLiteral + " ill formed";
return false;
}
} else {
endpointName = _bindAddress.substr(protocolDelPos + protocolDel.size());
}
}
if (isInet) {
endpoint = ip::tcp::endpoint(ip::tcp::v6(), port);
} else {
endpoint = local::stream_protocol::endpoint(endpointName);
}
_acceptor.open(endpoint.protocol());
if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
} else if (endpoint.protocol().protocol() == AF_UNSPEC) {
// In case of reuse, remote it first
unlink(endpointName.c_str());
}
_acceptor.set_option(socket_base::linger(true, 0));
_acceptor.set_option(socket_base::enable_connection_aborted(true));
_acceptor.bind(endpoint);
_acceptor.listen();
} catch (std::exception &e) {
error = "Unable to listen on " + _bindAddress + ": " + e.what();
return false;
}
return true;
}
bool CRemoteProcessorServer::stop()
{
_io_service.stop();
return true;
}
void CRemoteProcessorServer::acceptRegister(IRemoteCommandHandler &commandHandler)
{
auto peerHandler = [this, &commandHandler](asio::error_code ec) {
if (ec) {
std::cerr << "Accept failed: " << ec.message() << std::endl;
return;
}
const auto &endpoint = _socket.local_endpoint();
if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
_socket.set_option(asio::ip::tcp::no_delay(true));
}
handleNewConnection(commandHandler);
_socket.close();
acceptRegister(commandHandler);
};
_acceptor.async_accept(_socket, peerHandler);
}
bool CRemoteProcessorServer::process(IRemoteCommandHandler &commandHandler)
{
acceptRegister(commandHandler);
asio::error_code ec;
_io_service.run(ec);
if (ec) {
std::cerr << "Server failed: " << ec.message() << std::endl;
}
return ec.value() == 0;
}
// New connection
void CRemoteProcessorServer::handleNewConnection(IRemoteCommandHandler &commandHandler)
{
// Process all incoming requests from the client
while (true) {
// Process requests
// Create command message
CRequestMessage requestMessage;
string strError;
///// Receive command
CRequestMessage::Result res;
res = requestMessage.serialize(Socket(_socket), false, strError);
switch (res) {
case CRequestMessage::error:
std::cout << "Error while receiving message: " << strError << std::endl;
// fall through
case CRequestMessage::peerDisconnected:
// Consider peer disconnection as normal, no log
return; // Bail out
case CRequestMessage::success:
break; // No error, continue
}
// Actually process the request
bool bSuccess;
string strResult;
bSuccess = commandHandler.remoteCommandProcess(requestMessage, strResult);
// Send back answer
// Create answer message
CAnswerMessage answerMessage(strResult, bSuccess);
///// Send answer
res = answerMessage.serialize(_socket, true, strError);
switch (res) {
case CRequestMessage::peerDisconnected:
// Peer should not disconnect while waiting for an answer
// Fall through to log the error and bail out
case CRequestMessage::error:
std::cout << "Error while receiving message: " << strError << std::endl;
return; // Bail out
case CRequestMessage::success:
break; // No error, continue
}
}
}