普通文本  |  377行  |  12.32 KB

#!/usr/bin/python
"""
Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile
    If the job_id is a suite it will find all subjobs.
You need to change the location of the log it will parse.
The job_id needs to be in the afe database.
"""
import abc
import datetime
import os
import re
import pprint
import subprocess
import sys
import time

import common
from autotest_lib.server import frontend


LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47'
# logfile name format: scheduler.log.2014-02-14-18.10.56
time_format = '%Y-%m-%d-%H.%M.%S'
logfile_regex = r'scheduler.log.([0-9,.,-]+)'
logdir = os.path.join('/usr/local/autotest', 'logs')

class StateMachineViolation(Exception):
    pass


class LogLineException(Exception):
    pass


def should_process_log(time_str, time_format, cutoff_days=7):
    """Returns true if the logs was created after cutoff days.

    @param time_str: A string representing the time.
        eg: 2014-02-14-18.10.56
    @param time_format: A string representing the format of the time string.
        ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior
    @param cutoff_days: Int representind the cutoff in days.

    @return: Returns True if time_str has aged more than cutoff_days.
    """
    log_time = datetime.datetime.strptime(time_str, time_format)
    now = datetime.datetime.strptime(time.strftime(time_format), time_format)
    cutoff = now - datetime.timedelta(days=cutoff_days)
    return log_time < cutoff


def apply_regex(regex, line):
    """Simple regex applicator.

    @param regex: Regex to apply.
    @param line: The line to apply regex on.

    @return: A tuple with the matching groups, if there was a match.
    """
    log_match  = re.match(regex, line)
    if log_match:
        return log_match.groups()


class StateMachineParser(object):
    """Abstract class that enforces state transition ordering.

    Classes inheriting from StateMachineParser need to define an
    expected_transitions dictionary. The SMP will pop 'to' states
    from the dictionary as they occur, so you cannot same state transitions
    unless you specify 2 of them.
    """
    __metaclass__ = abc.ABCMeta


    @abc.abstractmethod
    def __init__(self):
        self.visited_states = []
        self.expected_transitions = {}


    def advance_state(self, from_state, to_state):
        """Checks that a transition is valid.

        @param from_state: A string representind the state the host is leaving.
        @param to_state: The state The host is going to, represented as a string.

        @raises LogLineException: If an invalid state transition was
            detected.
        """
        # TODO: Updating to the same state is a waste of bw.
        if from_state and from_state == to_state:
            return ('Updating to the same state is a waste of BW: %s->%s' %
                    (from_state, to_state))
            return

        if (from_state in self.expected_transitions and
            to_state in self.expected_transitions[from_state]):
            self.expected_transitions[from_state].remove(to_state)
            self.visited_states.append(to_state)
        else:
            return (from_state, to_state)


class SingleJobHostSMP(StateMachineParser):
    def __init__(self):
        self.visited_states = []
        self.expected_transitions = {
                'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'],
                'Resetting': ['Ready', 'Provisioning'],
                'Pending': ['Running'],
                'Provisioning': ['Repairing'],
                'Running': ['Ready']
        }


    def check_transitions(self, hostline):
        if hostline.line_info['field'] == 'status':
            self.advance_state(hostline.line_info['state'],
                    hostline.line_info['value'])


class SingleJobHqeSMP(StateMachineParser):
    def __init__(self):
        self.visited_states = []
        self.expected_transitions = {
                'Queued': ['Starting', 'Resetting', 'Aborted'],
                'Resetting': ['Pending', 'Provisioning'],
                'Provisioning': ['Pending', 'Queued', 'Repairing'],
                'Pending': ['Starting'],
                'Starting': ['Running'],
                'Running': ['Gathering', 'Parsing'],
                'Gathering': ['Parsing'],
                'Parsing': ['Completed', 'Aborted']
        }


    def check_transitions(self, hqeline):
        invalid_states = self.advance_state(
                hqeline.line_info['from_state'], hqeline.line_info['to_state'])
        if not invalid_states:
            return

        # Deal with repair.
        if (invalid_states[0] == 'Queued' and
            'Running' in self.visited_states):
            raise StateMachineViolation('Unrecognized state transition '
                    '%s->%s, expected transitions are %s' %
                    (invalid_states[0], invalid_states[1],
                     self.expected_transitions))


class LogLine(object):
    """Line objects.

    All classes inheriting from LogLine represent a line of some sort.
    A line is responsible for parsing itself, and invoking an SMP to
    validate state transitions. A line can be part of several state machines.
    """
    line_format = '%s'


    def __init__(self, state_machine_parsers):
        """
        @param state_machine_parsers: A list of smp objects to use to validate
            state changes on these types of lines..
        """
        self.smps = state_machine_parsers

        # Because, this is easier to flush.
        self.line_info = {}


    def parse_line(self, line):
        """Apply a line regex and save any information the parsed line contains.

        @param line: A string representing a line.
        """
        # Regex for all the things.
        line_rgx = '(.*)'
        parsed_line = apply_regex(line_rgx, line)
        if parsed_line:
            self.line_info['line'] = parsed_line[0]


    def flush(self):
        """Call any state machine parsers, persist line info if needed.
        """
        for smp in self.smps:
            smp.check_transitions(self)
        # TODO: persist this?
        self.line_info={}


    def format_line(self):
        try:
            return self.line_format % self.line_info
        except KeyError:
            return self.line_info['line']


class TimeLine(LogLine):
    """Filters timestamps for scheduler logs.
    """

    def parse_line(self, line):
        super(TimeLine, self).parse_line(line)

        # Regex for isolating the date and time from scheduler logs, eg:
        # 02/16 16:04:36.573 INFO |scheduler_:0574|...
        line_rgx = '([0-9,/,:,., ]+)(.*)'
        parsed_line = apply_regex(line_rgx, self.line_info['line'])
        if parsed_line:
            self.line_info['time'] = parsed_line[0]
            self.line_info['line'] = parsed_line[1]


class HostLine(TimeLine):
    """Manages hosts line parsing.
    """
    line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, '
                'updated %(field)s->%(value)s')


    def record_state_transition(self, line):
        """Apply the state_transition_rgx to a line and record state changes.

        @param line: The line we're expecting to contain a state transition.
        """
        state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*"
        match = apply_regex(state_transition_rgx, line)
        if match:
            self.line_info['state'] = match[0]
            self.line_info['field'] = match[1]
            self.line_info['value'] = match[2].replace("'", "")


    def parse_line(self, line):
        super(HostLine, self).parse_line(line)

        # Regex for getting host status. Eg:
        # 172.22.4 in Running updating {'status': 'Running'}
        line_rgx = '.*Host (([0-9,.,a-z,-]+).*)'
        parsed_line = apply_regex(line_rgx, self.line_info['line'])
        if parsed_line:
            self.line_info['line'] = parsed_line[0]
            self.line_info['host'] = parsed_line[1]
            self.record_state_transition(self.line_info['line'])
            return self.format_line()


class HQELine(TimeLine):
    """Manages HQE line parsing.
    """
    line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, '
            'updated to %(to_state)s. Flags: %(flags)s')


    def record_state_transition(self, line):
        """Apply the state_transition_rgx to a line and record state changes.

        @param line: The line we're expecting to contain a state transition.
        """
        # Regex for getting hqe status. Eg:
        # status:Running [active] -> Gathering
        state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)"
        match = apply_regex(state_transition_rgx, line)
        if match:
            self.line_info['from_state'] = match[0]
            self.line_info['flags'] = match[1]
            self.line_info['to_state'] = match[2]


    def parse_line(self, line):
        super(HQELine, self).parse_line(line)
        line_rgx = r'.*\| HQE: (([0-9]+).*)'
        parsed_line = apply_regex(line_rgx, self.line_info['line'])
        if parsed_line:
            self.line_info['line'] = parsed_line[0]
            self.line_info['hqe'] = parsed_line[1]
            self.record_state_transition(self.line_info['line'])
            return self.format_line()


class LogCrawler(object):
    """Crawl logs.

    Log crawlers are meant to apply some basic preprocessing to a log, and crawl
    the output validating state changes. They manage line and state machine
    creation. The initial filtering applied to the log needs to be grab all lines
    that match an action, such as the running of a job.
    """

    def __init__(self, log_name):
        self.log = log_name
        self.filter_command = 'cat %s' % log_name


    def preprocess_log(self):
        """Apply some basic filtering to the log.
        """
        proc = subprocess.Popen(self.filter_command,
                shell=True, stdout=subprocess.PIPE)
        out, err = proc.communicate()
        return out


class SchedulerLogCrawler(LogCrawler):
    """A log crawler for the scheduler logs.

    This crawler is only capable of processing information about a single job.
    """

    def __init__(self, log_name, **kwargs):
        super(SchedulerLogCrawler, self).__init__(log_name)
        self.job_id = kwargs['job_id']
        self.line_processors = [HostLine([SingleJobHostSMP()]),
                HQELine([SingleJobHqeSMP()])]
        self.filter_command = ('%s | grep "for job: %s"' %
                (self.filter_command, self.job_id))


    def parse_log(self):
        """Parse each line of the preprocessed log output.

        Pass each line through each possible line_processor. The one that matches
        will populate itself, call flush, this will walk the state machine of that
        line to the next step.
        """
        out = self.preprocess_log()
        response = []
        for job_line in out.split('\n'):
            parsed_line = None
            for processor in self.line_processors:
                line = processor.parse_line(job_line)
                if line and parsed_line:
                    raise LogLineException('Multiple Parsers claiming the line %s: '
                            'previous parsing: %s, current parsing: %s ' %
                            (job_line, parsed_line, line))
                elif line:
                    parsed_line = line
                    try:
                        processor.flush()
                    except StateMachineViolation as e:
                        response.append(str(e))
                        raise StateMachineViolation(response)
            response.append(parsed_line if parsed_line else job_line)
        return response


def process_logs():
    if len(sys.argv) < 2:
        print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 '
               'You need to change the location of the log it will parse.'
                'The job_id needs to be in the afe database.')
        sys.exit(1)

    job_id = int(sys.argv[1])
    rpc = frontend.AFE()
    suite_jobs = rpc.run('get_jobs', id=job_id)
    if not suite_jobs[0]['parent_job']:
        suite_jobs = rpc.run('get_jobs', parent_job=job_id)
    try:
        logfile = sys.argv[2]
    except Exception:
        logfile = LOGFILE

    for job in suite_jobs:
        log_crawler = SchedulerLogCrawler(logfile, job_id=job['id'])
        for line in log_crawler.parse_log():
            print line
    return


if __name__ == '__main__':
    process_logs()