普通文本  |  150行  |  5.08 KB

#!/usr/bin/env python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import ctypes
import select

import xi2
import xlib


class XI2Reader(object):
    """A reader to create connection to X server and read x input events."""
    def __init__(self, display_name=':0'):
        """Constructor

        Args:
            display_name: The X window display name.
        """
        self._display = xlib.XOpenDisplay(display_name)
        self._window = xlib.XDefaultRootWindow(self._display)
        self._data = []

        self._register()

        # Consumes the very first traffic within the connection with X server.
        xlib.XFlush(self._display)

    def _register(self):
        """Registers device and events to listen on"""
        mask = xi2.XIEventMask()
        mask.deviceid = xi2.XIAllDevices
        mask.mask_len = xi2.XIMaskLen(xi2.XI_RawMotion)
        mask.mask = ctypes.cast((ctypes.c_ubyte * mask.mask_len)(),
                                ctypes.POINTER(ctypes.c_ubyte))

        self._set_mask(mask.mask, xi2.XI_RawKeyPress)
        self._set_mask(mask.mask, xi2.XI_RawKeyRelease)
        self._set_mask(mask.mask, xi2.XI_RawButtonPress)
        self._set_mask(mask.mask, xi2.XI_RawButtonRelease)
        self._set_mask(mask.mask, xi2.XI_RawMotion)

        xi2.XISelectEvents(self._display, self._window, ctypes.pointer(mask), 1)
        xlib.XSelectInput(self._display, self._window, ctypes.c_long(0))

    def _set_mask(self, ptr, event):
        """Sets event mask"""
        val = xi2.XISetMask(ptr, event)
        ptr[event >> 3] = val

    def get_valuator_names(self, device_id):
        """Gets the valuator names for device.

        Return:
            An dictionary maps valuator index to descriptive names.
            Sample output:
            {
                0: 'Rel X',
                1: 'Rel Y',
                2: 'Abs Start Timestamp',
                3: 'Abs End Timestamp',
                4: 'Rel Vert Wheel',
                5: 'Rel Horiz Wheel'
            }
        """
        num_devices = ctypes.c_int()
        device = xi2.XIQueryDevice(self._display, device_id,
                ctypes.pointer(num_devices)).contents

        valuator_names = []
        for i in range(device.num_classes):
            if device.classes[i].contents.type == xi2.XIValuatorClass:
                valuator_class_info = ctypes.cast(device.classes[i],
                        ctypes.POINTER(xi2.XIValuatorClassInfo)).contents
                valuator_names.append(xlib.XGetAtomName(reader._display,
                        valuator_class_info.label))
        valuator_names_dict = {}
        for i in range(len(valuator_names)):
            valuator_names_dict[i] = valuator_names[i]
        return valuator_names_dict

    def get_connection_number(self):
        """Gets the file descriptor number for the connection with X server"""
        return xlib.XConnectionNumber(reader._display)

    def read_pending_events(self):
        """Read all the new event datas.

        Return:
            An array contains all event data with event type and valuator
            values. Sample format:
            {
                'deviceid': 11,
                'evtype': 17,
                'time': 406752437L,
                'valuators': {
                    0: (396.0, -38.0),
                    1: (578.0, -21.0),
                    2: (22802890.0, 22802890.0),
                    3: (26145746.0, 26145746.0)
                }
            }
        """
        data = []
        while xlib.XPending(self._display):
            xevent = xlib.XEvent()
            xlib.XNextEvent(self._display, ctypes.pointer(xevent))
            cookie = xevent.xcookie

            # Get event valuator_data
            result = xlib.XGetEventData(self._display, ctypes.pointer(cookie))
            if (not result or cookie.type != xlib.GenericEvent):
                continue

            raw_event_ptr = ctypes.cast(cookie.data,
                    ctypes.POINTER(xi2.XIRawEvent))
            raw_event = raw_event_ptr.contents
            valuator_state = raw_event.valuators

            # Two value arrays
            val_ptr = valuator_state.values
            val_idx = 0
            raw_val_ptr = raw_event.raw_values
            raw_val_idx = 0

            valuator_data = {}
            for i in range(valuator_state.mask_len):
                if xi2.XIMaskIsSet(valuator_state.mask, i):
                    valuator_data[i] = (val_ptr[val_idx],
                            raw_val_ptr[raw_val_idx])
                    val_idx += 1
                    raw_val_idx += 1
            data.append({'deviceid': raw_event.deviceid,
                         'evtype': cookie.evtype,
                         'time': raw_event.time,
                         'valuators': valuator_data})
        return data


if __name__ == '__main__':
    reader = XI2Reader()
    fd = reader.get_connection_number()

    while True:
        rl, _, _ = select.select([fd], [], [])
        if fd not in rl:
            break
        print reader.read_pending_events()