#
# Copyright 2008 Google Inc. All Rights Reserved.
#
import os
from autotest_lib.frontend.afe import rpc_client_lib
from autotest_lib.frontend.afe.json_rpc import proxy
from autotest_lib.client.common_lib import global_config, utils
GLOBAL_CONFIG = global_config.global_config
DEFAULT_SERVER = 'autotest'
AFE_RPC_PATH = '/afe/server/noauth/rpc/'
TKO_RPC_PATH = '/new_tko/server/noauth/rpc/'
class AuthError(Exception):
pass
def get_autotest_server(web_server=None):
if not web_server:
if 'AUTOTEST_WEB' in os.environ:
web_server = os.environ['AUTOTEST_WEB']
else:
web_server = GLOBAL_CONFIG.get_config_value(
'SERVER', 'hostname', default=DEFAULT_SERVER)
web_server = rpc_client_lib.add_protocol(web_server)
return web_server
class rpc_comm(object):
"""Shared AFE/TKO RPC class stuff"""
def __init__(self, web_server, rpc_path, username):
self.username = username
self.web_server = get_autotest_server(web_server)
try:
self.proxy = self._connect(rpc_path)
except rpc_client_lib.AuthError, s:
raise AuthError(s)
def _connect(self, rpc_path):
# This does not fail even if the address is wrong.
# We need to wait for an actual RPC to fail
headers = rpc_client_lib.authorization_headers(self.username,
self.web_server)
rpc_server = self.web_server + rpc_path
return rpc_client_lib.get_proxy(rpc_server, headers=headers)
def run(self, op, *args, **data):
if 'AUTOTEST_CLI_DEBUG' in os.environ:
print self.web_server, op, args, data
function = getattr(self.proxy, op)
result = function(*args, **data)
if 'AUTOTEST_CLI_DEBUG' in os.environ:
print 'result:', result
return result
class afe_comm(rpc_comm):
"""Handles the AFE setup and communication through RPC"""
def __init__(self, web_server=None, rpc_path=AFE_RPC_PATH, username=None):
super(afe_comm, self).__init__(web_server, rpc_path, username)
class tko_comm(rpc_comm):
"""Handles the TKO setup and communication through RPC"""
def __init__(self, web_server=None, rpc_path=TKO_RPC_PATH, username=None):
super(tko_comm, self).__init__(web_server, rpc_path, username)