/*-------------------------------------------------------------------------
* drawElements Quality Program Execution Server
* ---------------------------------------------
*
* Copyright 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*//*!
* \file
* \brief Test Execution Server.
*//*--------------------------------------------------------------------*/
#include "xsExecutionServer.hpp"
#include "deClock.h"
#include <cstdio>
using std::vector;
using std::string;
#if 1
# define DBG_PRINT(X) printf X
#else
# define DBG_PRINT(X)
#endif
namespace xs
{
inline bool MessageBuilder::isComplete (void) const
{
if (m_buffer.size() < MESSAGE_HEADER_SIZE)
return false;
else
return m_buffer.size() == getMessageSize();
}
const deUint8* MessageBuilder::getMessageData (void) const
{
return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
}
size_t MessageBuilder::getMessageDataSize (void) const
{
DE_ASSERT(isComplete());
return m_buffer.size() - MESSAGE_HEADER_SIZE;
}
void MessageBuilder::read (ByteBuffer& src)
{
// Try to get header.
if (m_buffer.size() < MESSAGE_HEADER_SIZE)
{
while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
src.getNumElements() > 0)
m_buffer.push_back(src.popBack());
DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
if (m_buffer.size() == MESSAGE_HEADER_SIZE)
{
// Got whole header, parse it.
Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
}
}
if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
{
// We have header.
size_t msgSize = getMessageSize();
size_t numBytesLeft = msgSize - m_buffer.size();
size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
if (numToRead > 0)
{
int curBufPos = (int)m_buffer.size();
m_buffer.resize(curBufPos+numToRead);
src.popBack(&m_buffer[curBufPos], (int)numToRead);
}
}
}
void MessageBuilder::clear (void)
{
m_buffer.clear();
m_messageType = MESSAGETYPE_NONE;
m_messageSize = 0;
}
ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
: TcpServer (family, port)
, m_testDriver (testProcess)
, m_runMode (runMode)
{
}
ExecutionServer::~ExecutionServer (void)
{
}
TestDriver* ExecutionServer::acquireTestDriver (void)
{
if (!m_testDriverLock.tryLock())
throw Error("Failed to acquire test driver");
return &m_testDriver;
}
void ExecutionServer::releaseTestDriver (TestDriver* driver)
{
DE_ASSERT(&m_testDriver == driver);
DE_UNREF(driver);
m_testDriverLock.unlock();
}
ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
{
printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
return new ExecutionRequestHandler(this, socket);
}
void ExecutionServer::connectionDone (ConnectionHandler* handler)
{
if (m_runMode == RUNMODE_SINGLE_EXEC)
m_socket.close();
TcpServer::connectionDone(handler);
}
ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
: ConnectionHandler (server, socket)
, m_execServer (server)
, m_testDriver (DE_NULL)
, m_bufferIn (RECV_BUFFER_SIZE)
, m_bufferOut (SEND_BUFFER_SIZE)
, m_run (false)
, m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE)
{
// Set flags.
m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
// Init protocol keepalives.
initKeepAlives();
}
ExecutionRequestHandler::~ExecutionRequestHandler (void)
{
if (m_testDriver)
m_execServer->releaseTestDriver(m_testDriver);
}
void ExecutionRequestHandler::handle (void)
{
DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
try
{
// Process execution session.
processSession();
}
catch (const std::exception& e)
{
printf("ExecutionRequestHandler::run(): %s\n", e.what());
}
DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
// Release test driver.
if (m_testDriver)
{
try
{
m_testDriver->reset();
}
catch (...)
{
}
m_execServer->releaseTestDriver(m_testDriver);
m_testDriver = DE_NULL;
}
// Close connection.
if (m_socket->isConnected())
m_socket->shutdown();
}
void ExecutionRequestHandler::acquireTestDriver (void)
{
DE_ASSERT(!m_testDriver);
// Try to acquire test driver - may fail.
m_testDriver = m_execServer->acquireTestDriver();
DE_ASSERT(m_testDriver);
m_testDriver->reset();
}
void ExecutionRequestHandler::processSession (void)
{
m_run = true;
deUint64 lastIoTime = deGetMicroseconds();
while (m_run)
{
bool anyIO = false;
// Read from socket to buffer.
anyIO = receive() || anyIO;
// Send bytes in buffer.
anyIO = send() || anyIO;
// Process incoming data.
if (m_bufferIn.getNumElements() > 0)
{
DE_ASSERT(!m_msgBuilder.isComplete());
m_msgBuilder.read(m_bufferIn);
}
if (m_msgBuilder.isComplete())
{
// Process message.
processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
m_msgBuilder.clear();
}
// Keepalives, anyone?
pollKeepAlives();
// Poll test driver for IO.
if (m_testDriver)
anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
// If no IO happens in a reasonable amount of time, go to sleep.
{
deUint64 curTime = deGetMicroseconds();
if (anyIO)
lastIoTime = curTime;
else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
else
deYield(); // Just give other threads chance to run.
}
}
}
void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
{
switch (type)
{
case MESSAGETYPE_HELLO:
{
HelloMessage msg(data, dataSize);
DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
if (msg.version != PROTOCOL_VERSION)
throw ProtocolError("Unsupported protocol version");
break;
}
case MESSAGETYPE_TEST:
{
TestMessage msg(data, dataSize);
DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
break;
}
case MESSAGETYPE_KEEPALIVE:
{
KeepAliveMessage msg(data, dataSize);
DBG_PRINT(("KeepAliveMessage\n"));
keepAliveReceived();
break;
}
case MESSAGETYPE_EXECUTE_BINARY:
{
ExecuteBinaryMessage msg(data, dataSize);
DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
break;
}
case MESSAGETYPE_STOP_EXECUTION:
{
StopExecutionMessage msg(data, dataSize);
DBG_PRINT(("StopExecutionMessage\n"));
getTestDriver()->stopProcess();
break;
}
default:
throw ProtocolError("Unsupported message");
}
}
void ExecutionRequestHandler::initKeepAlives (void)
{
deUint64 curTime = deGetMicroseconds();
m_lastKeepAliveSent = curTime;
m_lastKeepAliveReceived = curTime;
}
void ExecutionRequestHandler::keepAliveReceived (void)
{
m_lastKeepAliveReceived = deGetMicroseconds();
}
void ExecutionRequestHandler::pollKeepAlives (void)
{
deUint64 curTime = deGetMicroseconds();
// Check that we've got keepalives in timely fashion.
if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
throw ProtocolError("Keepalive timeout occurred");
// Send some?
if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
{
vector<deUint8> buf;
KeepAliveMessage().write(buf);
m_bufferOut.pushFront(&buf[0], (int)buf.size());
m_lastKeepAliveSent = deGetMicroseconds();
}
}
bool ExecutionRequestHandler::receive (void)
{
size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
if (maxLen > 0)
{
size_t numRecv;
deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
if (result == DE_SOCKETRESULT_SUCCESS)
{
DE_ASSERT(numRecv > 0);
m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
return true;
}
else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
{
m_run = false;
return true;
}
else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
return false;
else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
throw ConnectionError("Connection terminated");
else
throw ConnectionError("receive() failed");
}
else
return false;
}
bool ExecutionRequestHandler::send (void)
{
size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
if (maxLen > 0)
{
m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
size_t numSent;
deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
if (result == DE_SOCKETRESULT_SUCCESS)
{
DE_ASSERT(numSent > 0);
m_bufferOut.popBack((int)numSent);
return true;
}
else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
{
m_run = false;
return true;
}
else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
return false;
else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
throw ConnectionError("Connection terminated");
else
throw ConnectionError("send() failed");
}
else
return false;
}
} // xs