# Copyright 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ This module is designed to report metadata in a separated thread to avoid the performance overhead of sending data to Elasticsearch using HTTP. """ import logging import Queue import time import threading import common from autotest_lib.client.common_lib.cros.graphite import autotest_es from autotest_lib.scheduler import email_manager # The metadata_reporter thread runs inside scheduler process, thus it doesn't # need to setup django, otherwise, following import is needed: # from autotest_lib.frontend import setup_django_environment from autotest_lib.site_utils import server_manager_utils # Number of seconds to wait before checking queue again for uploading data. _REPORT_INTERVAL_SECONDS = 5 _MAX_METADATA_QUEUE_SIZE = 1000000 _MAX_UPLOAD_SIZE = 50000 # The number of seconds for upload to fail continuously. After that, upload will # be limited to 1 entry. _MAX_UPLOAD_FAIL_DURATION = 600 # Number of entries to retry when the previous upload failed continueously for # the duration of _MAX_UPLOAD_FAIL_DURATION. _MIN_RETRY_ENTRIES = 10 # Queue to buffer metadata to be reported. metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) _report_lock = threading.Lock() _abort = threading.Event() _queue_full = threading.Event() def queue(data): """Queue metadata to be uploaded in reporter thread. If the queue is full, an error will be logged for the first time the queue becomes full. The call does not wait or raise Queue.Full exception, so there is no overhead on the performance of caller, e.g., scheduler. @param data: A metadata entry, which should be a dictionary. """ try: metadata_queue.put_nowait(data) if _queue_full.is_set(): logging.info('Metadata queue is available to receive new data ' 'again.') _queue_full.clear() except Queue.Full: if not _queue_full.is_set(): _queue_full.set() logging.error('Metadata queue is full, cannot report data. ' 'Consider increasing the value of ' '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' 'to %d.', _MAX_METADATA_QUEUE_SIZE) def _email_alert(): """ """ if not server_manager_utils.use_server_db(): logging.debug('Server database not emailed, email alert is skipped.') return try: server_manager_utils.confirm_server_has_role(hostname='localhost', role='scheduler') except server_manager_utils.ServerActionError: # Only email alert if the server is a scheduler, not shard. return subject = ('Metadata upload has been failing for %d seconds' % _MAX_UPLOAD_FAIL_DURATION) email_manager.manager.enqueue_notify_email(subject, '') email_manager.manager.send_queued_emails() def _run(): """Report metadata in the queue until being aborted. """ # Time when the first time upload failed. None if the last upload succeeded. first_failed_upload = None # True if email alert was sent when upload has been failing continuously # for _MAX_UPLOAD_FAIL_DURATION seconds. email_alert = False upload_size = _MIN_RETRY_ENTRIES try: while True: start_time = time.time() data_list = [] if (first_failed_upload and time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION): upload_size = _MIN_RETRY_ENTRIES if not email_alert: _email_alert() email_alert = True else: upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE) while (not metadata_queue.empty() and len(data_list) < upload_size): data_list.append(metadata_queue.get_nowait()) if data_list: if autotest_es.bulk_post(data_list=data_list): time_used = time.time() - start_time logging.info('%d entries of metadata uploaded in %s ' 'seconds.', len(data_list), time_used) first_failed_upload = None email_alert = False else: logging.warn('Failed to upload %d entries of metadata, ' 'they will be retried later.', len(data_list)) for data in data_list: queue(data) if not first_failed_upload: first_failed_upload = time.time() sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time if sleep_time < 0: sleep_time = 0.5 _abort.wait(timeout=sleep_time) except Exception as e: logging.error('Metadata reporter thread failed with error: %s', e) raise finally: logging.info('Metadata reporting thread is exiting.') _abort.clear() _report_lock.release() def start(): """Start the thread to report metadata. """ # The lock makes sure there is only one reporting thread working. if _report_lock.locked(): logging.error('There is already a metadata reporter thread.') return _report_lock.acquire() reporting_thread = threading.Thread(target=_run) # Make it a daemon thread so it doesn't need to be closed explicitly. reporting_thread.setDaemon(True) reporting_thread.start() logging.info('Metadata reporting thread is started.') def abort(): """Abort the thread to report metadata. The call will wait up to 5 seconds for existing data to be uploaded. """ if not _report_lock.locked(): logging.error('The metadata reporting thread has already exited.') return _abort.set() logging.info('Waiting up to %s seconds for metadata reporting thread to ' 'complete.', _REPORT_INTERVAL_SECONDS) _abort.wait(_REPORT_INTERVAL_SECONDS)