#pylint: disable-msg=C0111 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Fakes for dynamic_suite-related unit tests.""" import common from autotest_lib.client.common_lib import control_data class FakeControlData(control_data.ControlData): """A fake parsed control file data structure.""" def __init__(self, suite, attributes, data, time='LONG', expr=False, dependencies=None, job_retries=0): self.string = 'text-' + data self.name = 'name-' + data self.path = None # Will be set during 'parsing'. self.data = data self.suite = suite self.attributes = attributes self.test_type = 'Client' self.experimental = expr if not dependencies: dependencies=[] self.dependencies = dependencies self.time = time self.sync_count = 1 self.job_retries = job_retries self.bug_template = {} self.require_ssp = None self.priority = 10 self.fast = False class FakeJob(object): """Faked out RPC-client-side Job object.""" def __init__(self, id=0, statuses=[], hostnames=[], parent_job_id=None): self.id = id self.hostnames = hostnames if hostnames else ['host%d' % id] self.owner = 'tester' self.name = 'Fake Job %d' % self.id self.statuses = statuses self.parent_job_id = parent_job_id class FakeHost(object): """Faked out RPC-client-side Host object.""" def __init__(self, hostname='', status='Ready', locked=False, locked_by=''): self.hostname = hostname self.status = status self.locked = locked self.locked_by = locked_by def __str__(self): return '%s: %s. %s%s' % ( self.hostname, self.status, 'Locked' if self.locked else 'Unlocked', ' by %s' % self.locked_by if self.locked else '') class FakeLabel(object): """Faked out RPC-client-side Label object.""" def __init__(self, id=0): self.id = id class FakeStatus(object): """Fake replacement for server-side job status objects. @var status: 'GOOD', 'FAIL', 'ERROR', etc. @var test_name: name of the test this is status for @var reason: reason for failure, if any @var aborted: present and True if the job was aborted. Optional. """ def __init__(self, code, name, reason, aborted=None, hostname=None, subdir='fake_Test.tag.subdir_tag', job_tag='id-owner/hostname'): self.status = code self.test_name = name self.reason = reason self.hostname = hostname if hostname else 'hostless' self.entry = {} self.test_started_time = '2012-11-11 11:11:11' self.test_finished_time = '2012-11-11 12:12:12' self.job_tag=job_tag self.subdir=subdir if aborted: self.entry['aborted'] = True if hostname: self.entry['host'] = {'hostname': hostname} def __repr__(self): return '%s\t%s\t%s: %s' % (self.status, self.test_name, self.reason, self.hostname) def equals_record(self, status): """Compares this object to a recorded status.""" if 'aborted' in self.entry and self.entry['aborted']: return status._status == 'ABORT' return (self.status == status._status and status._test_name.endswith(self.test_name) and self.reason == status._reason) def equals_hostname_record(self, status): """Compares this object to a recorded status. Expects the test name field of |status| to contain |self.hostname|. """ return (self.status == status._status and self.hostname in status._test_name and self.reason == status._reason) def record_all(self, record): pass def is_good(self): pass def name(self): return self.test_name class FakeMultiprocessingPool(object): """Fake multiprocessing pool to mock out the map method.""" def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): pass def map(self, func, iterable, chunksize=None): """Use the standard map() built-in instead of Pool.map()""" return map(func, iterable) def close(self): pass def join(self): pass