普通文本  |  169行  |  4.75 KB

# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Job leasing utilities

Jobs are leased to processes to own and run.  A process owning a job
obtain a job lease.  Ongoing ownership of the lease is established using
an exclusive fcntl lock on the lease file.

If a lease file is older than a few seconds and is not locked, then its
owning process should be considered crashed.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import contextlib
import errno
import fcntl
import logging
import os
import socket
import time

from scandir import scandir

logger = logging.getLogger(__name__)


@contextlib.contextmanager
def obtain_lease(path):
    """Return a context manager owning a lease file.

    The process that obtains the lease will maintain an exclusive,
    unlimited fcntl lock on the lock file.
    """
    with open(path, 'w') as f:
        fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        try:
            yield path
        finally:
            os.unlink(path)


def leases_iter(jobdir):
    """Yield Lease instances from jobdir.

    @param jobdir: job lease file directory
    @returns: iterator of Leases
    """
    for entry in scandir(jobdir):
        if _is_lease_entry(entry):
            yield Lease(entry)


class Lease(object):
    "Represents a job lease."

    # Seconds after a lease file's mtime where its owning process is not
    # considered dead.
    _FRESH_LIMIT = 5

    def __init__(self, entry):
        """Initialize instance.

        @param entry: scandir.DirEntry instance
        """
        self._entry = entry

    @property
    def id(self):
        """Return id of leased job."""
        return int(self._entry.name)

    def expired(self):
        """Return True if the lease is expired.

        A lease is considered expired if there is no fcntl lock on it
        and the grace period for the owning process to obtain the lock
        has passed.  The lease is not considered expired if the owning
        process removed the lock file normally, as an expired lease
        indicates that some error has occurred and clean up operations
        are needed.
        """
        try:
            stat_result = self._entry.stat()
        except OSError as e:  # pragma: no cover
            if e.errno == errno.ENOENT:
                return False
            raise
        mtime = stat_result.st_mtime_ns / (10 ** 9)
        if time.time() - mtime < self._FRESH_LIMIT:
            return False
        return not _fcntl_locked(self._entry.path)

    def cleanup(self):
        """Remove the lease file.

        This does not need to be called normally, as the owning process
        should clean up its files.
        """
        try:
            os.unlink(self._entry.path)
        except OSError as e:
            logger.warning('Error removing %s: %s', self._entry.path, e)
        try:
            os.unlink(self._sock_path)
        except OSError as e:
            # This is fine; it means that job_reporter crashed, but
            # lucifer was able to run its cleanup.
            logger.debug('Error removing %s: %s', self._sock_path, e)

    def abort(self):
        """Abort the job.

        This sends a datagram to the abort socket associated with the
        lease.

        If the socket is closed, either the connect() call or the send()
        call will raise socket.error with ECONNREFUSED.
        """
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        sock.setblocking(0)
        logger.debug('Connecting to abort socket %s', self._sock_path)
        sock.connect(self._sock_path)
        logger.debug('Sending abort to %s', self._sock_path)
        # The value sent does not matter.
        sent = sock.send('abort')
        # TODO(ayatane): I don't know if it is possible for sent to be 0
        assert sent > 0

    def maybe_abort(self):
        """Abort the job, ignoring errors."""
        try:
            self.abort()
        except socket.error as e:
            logger.debug('Error aborting socket: %s', e)

    @property
    def _sock_path(self):
        """Return the path of the abort socket corresponding to the lease."""
        return self._entry.path + ".sock"


def _is_lease_entry(entry):
    """Return True if the DirEntry is for a lease."""
    return entry.name.isdigit()


def _fcntl_locked(path):
    """Return True if a file is fcntl locked.

    @param path: path to file
    """
    try:
        fd = os.open(path, os.O_WRONLY)
    except (IOError, OSError):
        return False
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except IOError:
        return True
    else:
        return False
    finally:
        os.close(fd)