#!/usr/bin/env python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import ctypes
import select
import xi2
import xlib
class XI2Reader(object):
"""A reader to create connection to X server and read x input events."""
def __init__(self, display_name=':0'):
"""Constructor
Args:
display_name: The X window display name.
"""
self._display = xlib.XOpenDisplay(display_name)
self._window = xlib.XDefaultRootWindow(self._display)
self._data = []
self._register()
# Consumes the very first traffic within the connection with X server.
xlib.XFlush(self._display)
def _register(self):
"""Registers device and events to listen on"""
mask = xi2.XIEventMask()
mask.deviceid = xi2.XIAllDevices
mask.mask_len = xi2.XIMaskLen(xi2.XI_RawMotion)
mask.mask = ctypes.cast((ctypes.c_ubyte * mask.mask_len)(),
ctypes.POINTER(ctypes.c_ubyte))
self._set_mask(mask.mask, xi2.XI_RawKeyPress)
self._set_mask(mask.mask, xi2.XI_RawKeyRelease)
self._set_mask(mask.mask, xi2.XI_RawButtonPress)
self._set_mask(mask.mask, xi2.XI_RawButtonRelease)
self._set_mask(mask.mask, xi2.XI_RawMotion)
xi2.XISelectEvents(self._display, self._window, ctypes.pointer(mask), 1)
xlib.XSelectInput(self._display, self._window, ctypes.c_long(0))
def _set_mask(self, ptr, event):
"""Sets event mask"""
val = xi2.XISetMask(ptr, event)
ptr[event >> 3] = val
def get_valuator_names(self, device_id):
"""Gets the valuator names for device.
Return:
An dictionary maps valuator index to descriptive names.
Sample output:
{
0: 'Rel X',
1: 'Rel Y',
2: 'Abs Start Timestamp',
3: 'Abs End Timestamp',
4: 'Rel Vert Wheel',
5: 'Rel Horiz Wheel'
}
"""
num_devices = ctypes.c_int()
device = xi2.XIQueryDevice(self._display, device_id,
ctypes.pointer(num_devices)).contents
valuator_names = []
for i in range(device.num_classes):
if device.classes[i].contents.type == xi2.XIValuatorClass:
valuator_class_info = ctypes.cast(device.classes[i],
ctypes.POINTER(xi2.XIValuatorClassInfo)).contents
valuator_names.append(xlib.XGetAtomName(reader._display,
valuator_class_info.label))
valuator_names_dict = {}
for i in range(len(valuator_names)):
valuator_names_dict[i] = valuator_names[i]
return valuator_names_dict
def get_connection_number(self):
"""Gets the file descriptor number for the connection with X server"""
return xlib.XConnectionNumber(reader._display)
def read_pending_events(self):
"""Read all the new event datas.
Return:
An array contains all event data with event type and valuator
values. Sample format:
{
'deviceid': 11,
'evtype': 17,
'time': 406752437L,
'valuators': {
0: (396.0, -38.0),
1: (578.0, -21.0),
2: (22802890.0, 22802890.0),
3: (26145746.0, 26145746.0)
}
}
"""
data = []
while xlib.XPending(self._display):
xevent = xlib.XEvent()
xlib.XNextEvent(self._display, ctypes.pointer(xevent))
cookie = xevent.xcookie
# Get event valuator_data
result = xlib.XGetEventData(self._display, ctypes.pointer(cookie))
if (not result or cookie.type != xlib.GenericEvent):
continue
raw_event_ptr = ctypes.cast(cookie.data,
ctypes.POINTER(xi2.XIRawEvent))
raw_event = raw_event_ptr.contents
valuator_state = raw_event.valuators
# Two value arrays
val_ptr = valuator_state.values
val_idx = 0
raw_val_ptr = raw_event.raw_values
raw_val_idx = 0
valuator_data = {}
for i in range(valuator_state.mask_len):
if xi2.XIMaskIsSet(valuator_state.mask, i):
valuator_data[i] = (val_ptr[val_idx],
raw_val_ptr[raw_val_idx])
val_idx += 1
raw_val_idx += 1
data.append({'deviceid': raw_event.deviceid,
'evtype': cookie.evtype,
'time': raw_event.time,
'valuators': valuator_data})
return data
if __name__ == '__main__':
reader = XI2Reader()
fd = reader.get_connection_number()
while True:
rl, _, _ = select.select([fd], [], [])
if fd not in rl:
break
print reader.read_pending_events()