#!/usr/bin/python
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import itertools
import mock
import unittest
import common
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import hosts
from autotest_lib.client.common_lib.cros import retry
from autotest_lib.server.hosts import cros_firmware
from autotest_lib.server.hosts import cros_repair
from autotest_lib.server.hosts import repair_utils
CROS_VERIFY_DAG = (
(repair_utils.SshVerifier, 'ssh', ()),
(cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
(cros_repair.HWIDVerifier, 'hwid', ('ssh',)),
(cros_repair.ACPowerVerifier, 'power', ('ssh',)),
(cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
(cros_repair.WritableVerifier, 'writable', ('ssh',)),
(cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
(cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
(cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
(cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
(cros_repair.PythonVerifier, 'python', ('ssh',)),
(repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
(cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
)
CROS_REPAIR_ACTIONS = (
(repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
(cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
(cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
(cros_firmware.FirmwareRepair,
'firmware', (), ('ssh', 'fwstatus', 'good_au')),
(cros_repair.CrosRebootRepair,
'reboot', ('ssh',), ('devmode', 'writable',)),
(cros_repair.ColdRebootRepair,
'coldboot', ('ssh',), ('ec_reset',)),
(cros_repair.AutoUpdateRepair,
'au',
('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
('power', 'rwfw', 'python', 'cros')),
(cros_repair.PowerWashRepair,
'powerwash',
('ssh', 'writable'),
('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros')),
(cros_repair.ServoInstallRepair,
'usb',
(),
('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw',
'python', 'cros')),
)
MOBLAB_VERIFY_DAG = (
(repair_utils.SshVerifier, 'ssh', ()),
(cros_repair.ACPowerVerifier, 'power', ('ssh',)),
(cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
(cros_repair.PythonVerifier, 'python', ('ssh',)),
(repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
)
MOBLAB_REPAIR_ACTIONS = (
(repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
(cros_repair.AutoUpdateRepair,
'au', ('ssh',), ('power', 'rwfw', 'python', 'cros',)),
)
JETSTREAM_VERIFY_DAG = (
(repair_utils.SshVerifier, 'ssh', ()),
(cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
(cros_repair.HWIDVerifier, 'hwid', ('ssh',)),
(cros_repair.ACPowerVerifier, 'power', ('ssh',)),
(cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
(cros_repair.WritableVerifier, 'writable', ('ssh',)),
(cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
(cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
(cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
(cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
(cros_repair.PythonVerifier, 'python', ('ssh',)),
(repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
(cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
(cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)),
(cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation',
('ssh',)),
(cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)),
)
JETSTREAM_REPAIR_ACTIONS = (
(repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
(cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
(cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
(cros_firmware.FirmwareRepair,
'firmware', (), ('ssh', 'fwstatus', 'good_au')),
(cros_repair.CrosRebootRepair,
'reboot', ('ssh',), ('devmode', 'writable',)),
(cros_repair.ColdRebootRepair,
'coldboot', ('ssh',), ('ec_reset',)),
(cros_repair.JetstreamTpmRepair,
'jetstream_tpm_repair',
('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
'jetstream_attestation')),
(cros_repair.JetstreamServiceRepair,
'jetstream_service_repair',
('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm',
'jetstream_attestation'),
('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
'jetstream_attestation', 'jetstream_services')),
(cros_repair.AutoUpdateRepair,
'au',
('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
'jetstream_attestation', 'jetstream_services')),
(cros_repair.PowerWashRepair,
'powerwash',
('ssh', 'writable'),
('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros',
'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
(cros_repair.ServoInstallRepair,
'usb',
(),
('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python',
'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
)
CRYPTOHOME_STATUS_OWNED = """{
"installattrs": {
"first_install": true,
"initialized": true,
"invalid": false,
"lockbox_index": 536870916,
"lockbox_nvram_version": 2,
"secure": true,
"size": 0,
"version": 1
},
"mounts": [ ],
"tpm": {
"being_owned": false,
"can_connect": true,
"can_decrypt": false,
"can_encrypt": false,
"can_load_srk": true,
"can_load_srk_pubkey": true,
"enabled": true,
"has_context": true,
"has_cryptohome_key": false,
"has_key_handle": false,
"last_error": 0,
"owned": true
}
}
"""
CRYPTOHOME_STATUS_NOT_OWNED = """{
"installattrs": {
"first_install": true,
"initialized": true,
"invalid": false,
"lockbox_index": 536870916,
"lockbox_nvram_version": 2,
"secure": true,
"size": 0,
"version": 1
},
"mounts": [ ],
"tpm": {
"being_owned": false,
"can_connect": true,
"can_decrypt": false,
"can_encrypt": false,
"can_load_srk": false,
"can_load_srk_pubkey": false,
"enabled": true,
"has_context": true,
"has_cryptohome_key": false,
"has_key_handle": false,
"last_error": 0,
"owned": false
}
}
"""
CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{
"installattrs": {
"first_install": true,
"initialized": true,
"invalid": false,
"lockbox_index": 536870916,
"lockbox_nvram_version": 2,
"secure": true,
"size": 0,
"version": 1
},
"mounts": [ ],
"tpm": {
"being_owned": false,
"can_connect": true,
"can_decrypt": false,
"can_encrypt": false,
"can_load_srk": false,
"can_load_srk_pubkey": false,
"enabled": true,
"has_context": true,
"has_cryptohome_key": false,
"has_key_handle": false,
"last_error": 0,
"owned": true
}
}
"""
TPM_STATUS_READY = """
TPM Enabled: true
TPM Owned: true
TPM Being Owned: false
TPM Ready: true
TPM Password: 9eaee4da8b4c
"""
TPM_STATUS_NOT_READY = """
TPM Enabled: true
TPM Owned: false
TPM Being Owned: true
TPM Ready: false
TPM Password:
"""
class CrosRepairUnittests(unittest.TestCase):
# pylint: disable=missing-docstring
maxDiff = None
def test_cros_repair(self):
verify_dag = cros_repair._cros_verify_dag()
self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG)
self.check_verify_dag(verify_dag)
repair_actions = cros_repair._cros_repair_actions()
self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS)
self.check_repair_actions(verify_dag, repair_actions)
def test_moblab_repair(self):
verify_dag = cros_repair._moblab_verify_dag()
self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG)
self.check_verify_dag(verify_dag)
repair_actions = cros_repair._moblab_repair_actions()
self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS)
self.check_repair_actions(verify_dag, repair_actions)
def test_jetstream_repair(self):
verify_dag = cros_repair._jetstream_verify_dag()
self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG)
self.check_verify_dag(verify_dag)
repair_actions = cros_repair._jetstream_repair_actions()
self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS)
self.check_repair_actions(verify_dag, repair_actions)
def check_verify_dag(self, verify_dag):
"""Checks that dependency labels are defined."""
labels = [n[1] for n in verify_dag]
for node in verify_dag:
for dep in node[2]:
self.assertIn(dep, labels)
def check_repair_actions(self, verify_dag, repair_actions):
"""Checks that dependency and trigger labels are defined."""
verify_labels = [n[1] for n in verify_dag]
for action in repair_actions:
deps = action[2]
triggers = action[3]
for label in deps + triggers:
self.assertIn(label, verify_labels)
def test_get_cryptohome_status_owned(self):
mock_host = mock.Mock()
mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
status = cros_repair.CryptohomeStatus(mock_host)
self.assertDictEqual({
'being_owned': False,
'can_connect': True,
'can_decrypt': False,
'can_encrypt': False,
'can_load_srk': True,
'can_load_srk_pubkey': True,
'enabled': True,
'has_context': True,
'has_cryptohome_key': False,
'has_key_handle': False,
'last_error': 0,
'owned': True,
}, status['tpm'])
self.assertTrue(status.tpm_enabled)
self.assertTrue(status.tpm_owned)
self.assertTrue(status.tpm_can_load_srk)
self.assertTrue(status.tpm_can_load_srk_pubkey)
def test_get_cryptohome_status_not_owned(self):
mock_host = mock.Mock()
mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
status = cros_repair.CryptohomeStatus(mock_host)
self.assertDictEqual({
'being_owned': False,
'can_connect': True,
'can_decrypt': False,
'can_encrypt': False,
'can_load_srk': False,
'can_load_srk_pubkey': False,
'enabled': True,
'has_context': True,
'has_cryptohome_key': False,
'has_key_handle': False,
'last_error': 0,
'owned': False,
}, status['tpm'])
self.assertTrue(status.tpm_enabled)
self.assertFalse(status.tpm_owned)
self.assertFalse(status.tpm_can_load_srk)
self.assertFalse(status.tpm_can_load_srk_pubkey)
@mock.patch.object(cros_repair, '_is_virtual_machine')
def test_tpm_status_verifier_owned(self, mock_is_virt):
mock_is_virt.return_value = False
mock_host = mock.Mock()
mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
tpm_verifier.verify(mock_host)
@mock.patch.object(cros_repair, '_is_virtual_machine')
def test_tpm_status_verifier_not_owned(self, mock_is_virt):
mock_is_virt.return_value = False
mock_host = mock.Mock()
mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
tpm_verifier.verify(mock_host)
@mock.patch.object(cros_repair, '_is_virtual_machine')
def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
mock_is_virt.return_value = False
mock_host = mock.Mock()
mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK
tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
tpm_verifier.verify(mock_host)
self.assertEqual('Cannot load the TPM SRK',
ctx.exception.message)
def test_jetstream_tpm_owned(self):
mock_host = mock.Mock()
mock_host.run.side_effect = [
mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
mock.Mock(stdout=TPM_STATUS_READY),
]
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
tpm_verifier.verify(mock_host)
@mock.patch.object(retry.logging, 'warning')
@mock.patch.object(retry.time, 'time')
@mock.patch.object(retry.time, 'sleep')
def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
mock_time.side_effect = itertools.count(0, 20)
mock_host = mock.Mock()
mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
tpm_verifier.verify(mock_host)
self.assertEqual('TPM is not owned', ctx.exception.message)
@mock.patch.object(retry.logging, 'warning')
@mock.patch.object(retry.time, 'time')
@mock.patch.object(retry.time, 'sleep')
def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging):
mock_time.side_effect = itertools.count(0, 20)
mock_host = mock.Mock()
mock_host.run.side_effect = itertools.cycle([
mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
mock.Mock(stdout=TPM_STATUS_NOT_READY),
])
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
tpm_verifier.verify(mock_host)
self.assertEqual('TPM is not ready', ctx.exception.message)
@mock.patch.object(retry.logging, 'warning')
@mock.patch.object(retry.time, 'time')
@mock.patch.object(retry.time, 'sleep')
def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time,
mock_logging):
mock_time.side_effect = itertools.count(0, 20)
mock_host = mock.Mock()
mock_host.run.side_effect = error.AutoservRunError('test', None)
tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
with self.assertRaises(hosts.AutoservVerifyError) as ctx:
tpm_verifier.verify(mock_host)
self.assertEqual('Could not determine TPM status',
ctx.exception.message)
if __name__ == '__main__':
unittest.main()