普通文本  |  266行  |  9.29 KB

#! python
#
# Python Serial Port Extension for Win32, Linux, BSD, Jython
# see __init__.py
#
# This module implements a loop back connection receiving itself what it sent.
#
# The purpose of this module is.. well... You can run the unit tests with it.
# and it was so easy to implement ;-)
#
# (C) 2001-2011 Chris Liechti <cliechti@gmx.net>
# this is distributed under a free software license, see license.txt
#
# URL format:    loop://[option[/option...]]
# options:
# - "debug" print diagnostic messages

from serial.serialutil import *
import threading
import time
import logging

# map log level names to constants. used in fromURL()
LOGGER_LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    }


class LoopbackSerial(SerialBase):
    """Serial port implementation that simulates a loop back connection in plain software."""

    BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
                 9600, 19200, 38400, 57600, 115200)

    def open(self):
        """Open port with current settings. This may throw a SerialException
           if the port cannot be opened."""
        if self._isOpen:
            raise SerialException("Port is already open.")
        self.logger = None
        self.buffer_lock = threading.Lock()
        self.loop_buffer = bytearray()
        self.cts = False
        self.dsr = False

        if self._port is None:
            raise SerialException("Port must be configured before it can be used.")
        # not that there is anything to open, but the function applies the
        # options found in the URL
        self.fromURL(self.port)

        # not that there anything to configure...
        self._reconfigurePort()
        # all things set up get, now a clean start
        self._isOpen = True
        if not self._rtscts:
            self.setRTS(True)
            self.setDTR(True)
        self.flushInput()
        self.flushOutput()

    def _reconfigurePort(self):
        """Set communication parameters on opened port. for the loop://
        protocol all settings are ignored!"""
        # not that's it of any real use, but it helps in the unit tests
        if not isinstance(self._baudrate, (int, long)) or not 0 < self._baudrate < 2**32:
            raise ValueError("invalid baudrate: %r" % (self._baudrate))
        if self.logger:
            self.logger.info('_reconfigurePort()')

    def close(self):
        """Close port"""
        if self._isOpen:
            self._isOpen = False
            # in case of quick reconnects, give the server some time
            time.sleep(0.3)

    def makeDeviceName(self, port):
        raise SerialException("there is no sensible way to turn numbers into URLs")

    def fromURL(self, url):
        """extract host and port from an URL string"""
        if url.lower().startswith("loop://"): url = url[7:]
        try:
            # process options now, directly altering self
            for option in url.split('/'):
                if '=' in option:
                    option, value = option.split('=', 1)
                else:
                    value = None
                if not option:
                    pass
                elif option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[value])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError, e:
            raise SerialException('expected a string in the form "[loop://][option[/option...]]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -

    def inWaiting(self):
        """Return the number of characters currently in the input buffer."""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            # attention the logged value can differ from return value in
            # threaded environments...
            self.logger.debug('inWaiting() -> %d' % (len(self.loop_buffer),))
        return len(self.loop_buffer)

    def read(self, size=1):
        """Read size bytes from the serial port. If a timeout is set it may
        return less characters as requested. With no timeout it will block
        until the requested number of bytes is read."""
        if not self._isOpen: raise portNotOpenError
        if self._timeout is not None:
            timeout = time.time() + self._timeout
        else:
            timeout = None
        data = bytearray()
        while size > 0:
            self.buffer_lock.acquire()
            try:
                block = to_bytes(self.loop_buffer[:size])
                del self.loop_buffer[:size]
            finally:
                self.buffer_lock.release()
            data += block
            size -= len(block)
            # check for timeout now, after data has been read.
            # useful for timeout = 0 (non blocking) read
            if timeout and time.time() > timeout:
                break
        return bytes(data)

    def write(self, data):
        """Output the given string over the serial port. Can block if the
        connection is blocked. May raise SerialException if the connection is
        closed."""
        if not self._isOpen: raise portNotOpenError
        # ensure we're working with bytes
        data = to_bytes(data)
        # calculate aprox time that would be used to send the data
        time_used_to_send = 10.0*len(data) / self._baudrate
        # when a write timeout is configured check if we would be successful
        # (not sending anything, not even the part that would have time)
        if self._writeTimeout is not None and time_used_to_send > self._writeTimeout:
            time.sleep(self._writeTimeout) # must wait so that unit test succeeds
            raise writeTimeoutError
        self.buffer_lock.acquire()
        try:
            self.loop_buffer += data
        finally:
            self.buffer_lock.release()
        return len(data)

    def flushInput(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('flushInput()')
        self.buffer_lock.acquire()
        try:
            del self.loop_buffer[:]
        finally:
            self.buffer_lock.release()

    def flushOutput(self):
        """Clear output buffer, aborting the current output and
        discarding all that is in the buffer."""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('flushOutput()')

    def sendBreak(self, duration=0.25):
        """Send break condition. Timed, returns to idle state after given
        duration."""
        if not self._isOpen: raise portNotOpenError

    def setBreak(self, level=True):
        """Set break: Controls TXD. When active, to transmitting is
        possible."""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('setBreak(%r)' % (level,))

    def setRTS(self, level=True):
        """Set terminal status line: Request To Send"""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('setRTS(%r) -> state of CTS' % (level,))
        self.cts = level

    def setDTR(self, level=True):
        """Set terminal status line: Data Terminal Ready"""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('setDTR(%r) -> state of DSR' % (level,))
        self.dsr = level

    def getCTS(self):
        """Read terminal status line: Clear To Send"""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('getCTS() -> state of RTS (%r)' % (self.cts,))
        return self.cts

    def getDSR(self):
        """Read terminal status line: Data Set Ready"""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('getDSR() -> state of DTR (%r)' % (self.dsr,))
        return self.dsr

    def getRI(self):
        """Read terminal status line: Ring Indicator"""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('returning dummy for getRI()')
        return False

    def getCD(self):
        """Read terminal status line: Carrier Detect"""
        if not self._isOpen: raise portNotOpenError
        if self.logger:
            self.logger.info('returning dummy for getCD()')
        return True

    # - - - platform specific - - -
    # None so far


# assemble Serial class with the platform specific implementation and the base
# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
# library, derive from io.RawIOBase
try:
    import io
except ImportError:
    # classic version with our own file-like emulation
    class Serial(LoopbackSerial, FileLike):
        pass
else:
    # io library present
    class Serial(LoopbackSerial, io.RawIOBase):
        pass


# simple client test
if __name__ == '__main__':
    import sys
    s = Serial('loop://')
    sys.stdout.write('%s\n' % s)

    sys.stdout.write("write...\n")
    s.write("hello\n")
    s.flush()
    sys.stdout.write("read: %s\n" % s.read(5))

    s.close()