# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import socket import struct import btsocket SDP_HDR_FORMAT = '>BHH' SDP_HDR_SIZE = struct.calcsize(SDP_HDR_FORMAT) SDP_TID_CNT = 1 << 16 SDP_MAX_UUIDS_CNT = 12 SDP_BODY_CNT_FORMAT = '>HH' SDP_BODY_CNT_SIZE = struct.calcsize(SDP_BODY_CNT_FORMAT) BLUETOOTH_BASE_UUID = 0x0000000000001000800000805F9B34FB # Constants are taken from lib/sdp.h in BlueZ source. SDP_RESPONSE_TIMEOUT = 20 SDP_REQ_BUFFER_SIZE = 2048 SDP_RSP_BUFFER_SIZE = 65535 SDP_PDU_CHUNK_SIZE = 1024 SDP_PSM = 0x0001 SDP_UUID = 0x0001 SDP_DATA_NIL = 0x00 SDP_UINT8 = 0x08 SDP_UINT16 = 0x09 SDP_UINT32 = 0x0A SDP_UINT64 = 0x0B SDP_UINT128 = 0x0C SDP_INT8 = 0x10 SDP_INT16 = 0x11 SDP_INT32 = 0x12 SDP_INT64 = 0x13 SDP_INT128 = 0x14 SDP_UUID_UNSPEC = 0x18 SDP_UUID16 = 0x19 SDP_UUID32 = 0x1A SDP_UUID128 = 0x1C SDP_TEXT_STR_UNSPEC = 0x20 SDP_TEXT_STR8 = 0x25 SDP_TEXT_STR16 = 0x26 SDP_TEXT_STR32 = 0x27 SDP_BOOL = 0x28 SDP_SEQ_UNSPEC = 0x30 SDP_SEQ8 = 0x35 SDP_SEQ16 = 0x36 SDP_SEQ32 = 0x37 SDP_ALT_UNSPEC = 0x38 SDP_ALT8 = 0x3D SDP_ALT16 = 0x3E SDP_ALT32 = 0x3F SDP_URL_STR_UNSPEC = 0x40 SDP_URL_STR8 = 0x45 SDP_URL_STR16 = 0x46 SDP_URL_STR32 = 0x47 SDP_ERROR_RSP = 0x01 SDP_SVC_SEARCH_REQ = 0x02 SDP_SVC_SEARCH_RSP = 0x03 SDP_SVC_ATTR_REQ = 0x04 SDP_SVC_ATTR_RSP = 0x05 SDP_SVC_SEARCH_ATTR_REQ = 0x06 SDP_SVC_SEARCH_ATTR_RSP = 0x07 class BluetoothSDPSocketError(Exception): """Error raised for SDP-related issues with BluetoothSDPSocket.""" pass class BluetoothSDPSocket(btsocket.socket): """Bluetooth SDP Socket. BluetoothSDPSocket wraps the btsocket.socket() class to implement the necessary send and receive methods for the SDP protocol. """ def __init__(self): super(BluetoothSDPSocket, self).__init__(family=btsocket.AF_BLUETOOTH, type=socket.SOCK_SEQPACKET, proto=btsocket.BTPROTO_L2CAP) self.tid = 0 def gen_tid(self): """Generate new Transaction ID @return Transaction ID """ self.tid = (self.tid + 1) % SDP_TID_CNT return self.tid def connect(self, address): """Connect to device with the given address @param address: Bluetooth address. """ try: super(BluetoothSDPSocket, self).connect((address, SDP_PSM)) except btsocket.error as e: logging.error('Error connecting to %s: %s', address, e) raise BluetoothSDPSocketError('Error connecting to host: %s' % e) except btsocket.timeout as e: logging.error('Timeout connecting to %s: %s', address, e) raise BluetoothSDPSocketError('Timeout connecting to host: %s' % e) def send_request(self, code, tid, data, forced_pdu_size=None): """Send a request to the socket. @param code: Request code. @param tid: Transaction ID. @param data: Parameters as bytearray or str. @param forced_pdu_size: Use certain PDU size parameter instead of calculating actual length of sequence. @raise BluetoothSDPSocketError: if 'send' to the socket didn't succeed. """ size = len(data) if forced_pdu_size != None: size = forced_pdu_size msg = struct.pack(SDP_HDR_FORMAT, code, tid, size) + data length = self.send(msg) if length != len(msg): raise BluetoothSDPSocketError('Short write on socket') def recv_response(self): """Receive a single response from the socket. The response data is not parsed. Use settimeout() to set whether this method will block if there is no reply, return immediately or wait for a specific length of time before timing out and raising TimeoutError. @return tuple of (code, tid, data) @raise BluetoothSDPSocketError: if the received packet is too small or if size of the packet differs from size written in header """ # Read the response from the socket. response = self.recv(SDP_RSP_BUFFER_SIZE) if len(response) < SDP_HDR_SIZE: raise BluetoothSDPSocketError('Short read on socket') code, tid, length = struct.unpack_from(SDP_HDR_FORMAT, response) data = response[SDP_HDR_SIZE:] if length != len(data): raise BluetoothSDPSocketError('Short read on socket') return code, tid, data def send_request_and_wait(self, req_code, req_data, forced_pdu_size=None): """Send a request to the socket and wait for the response. The response data is not parsed. @param req_code: Request code. @param req_data: Parameters as bytearray or str. @param forced_pdu_size: Use certain PDU size parameter instead of calculating actual length of sequence. Use settimeout() to set whether this method will block if there is no reply, return immediately or wait for a specific length of time before timing out and raising TimeoutError. @return tuple of (rsp_code, data) @raise BluetoothSDPSocketError: if Transaction ID of the response doesn't match to Transaction ID sent in request """ req_tid = self.gen_tid() self.send_request(req_code, req_tid, req_data, forced_pdu_size) rsp_code, rsp_tid, rsp_data = self.recv_response() if req_tid != rsp_tid: raise BluetoothSDPSocketError("Transaction IDs for request and " "response don't match") return rsp_code, rsp_data def _pack_list(self, data_element_list): """Preappend a list with required header. Size of the header is chosen to be minimal possible. @param data_element_list: List to be packed. @return packed list as a str @raise BluetoothSDPSocketError: if size of the list is larger than or equal to 2^32 bytes, which is not supported in SDP transactions """ size = len(data_element_list) if size < (1 << 8): header = struct.pack('>BB', SDP_SEQ8, size) elif size < (1 << 16): header = struct.pack('>BH', SDP_SEQ16, size) elif size < (1 << 32): header = struct.pack('>BI', SDP_SEQ32, size) else: raise BluetoothSDPSocketError('List is too long') return header + data_element_list def _pack_uuids(self, uuids, preferred_size): """Pack a list of UUIDs to a binary sequence @param uuids: List of UUIDs (as integers). @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128). @return packed list as a str @raise BluetoothSDPSocketError: the given preferred size is not supported by SDP """ if preferred_size not in (16, 32, 128): raise BluetoothSDPSocketError('Unsupported UUID size: %d; ' 'Supported values are: 16, 32, 128' % preferred_size) res = '' for uuid in uuids: # Fall back to 128 bits if the UUID doesn't fit into preferred_size. if uuid >= (1 << preferred_size) or preferred_size == 128: uuid128 = uuid if uuid < (1 << 32): uuid128 = (uuid128 << 96) + BLUETOOTH_BASE_UUID packed_uuid = struct.pack('>BQQ', SDP_UUID128, uuid128 >> 64, uuid128 & ((1 << 64) - 1)) elif preferred_size == 16: packed_uuid = struct.pack('>BH', SDP_UUID16, uuid) elif preferred_size == 32: packed_uuid = struct.pack('>BI', SDP_UUID32, uuid) res += packed_uuid res = self._pack_list(res) return res def _unpack_uuids(self, response): """Unpack SDP response @param response: body of raw SDP response. @return tuple of (uuids, cont_state) """ total_cnt, cur_cnt = struct.unpack_from(SDP_BODY_CNT_FORMAT, response) scanned = SDP_BODY_CNT_SIZE uuids = [] for i in range(cur_cnt): uuid, = struct.unpack_from('>I', response, scanned) uuids.append(uuid) scanned += 4 cont_state = response[scanned:] return uuids, cont_state def _unpack_error_code(self, response): """Unpack Error Code from SDP error response @param response: Body of raw SDP response. @return Error Code as int """ error_code, = struct.unpack_from('>H', response) return error_code def service_search_request(self, uuids, max_rec_cnt, preferred_size=32, forced_pdu_size=None, invalid_request=False): """Send a Service Search Request @param uuids: List of UUIDs (as integers) to look for. @param max_rec_cnt: Maximum count of returned service records. @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128). @param forced_pdu_size: Use certain PDU size parameter instead of calculating actual length of sequence. @param invalid_request: Whether to send request with intentionally invalid syntax for testing purposes (bool flag). @return list of found services' service record handles or Error Code @raise BluetoothSDPSocketError: arguments do not match the SDP restrictions or if the response has an incorrect code """ if max_rec_cnt < 1 or max_rec_cnt > 65535: raise BluetoothSDPSocketError('MaximumServiceRecordCount must be ' 'between 1 and 65535, inclusive') if len(uuids) > SDP_MAX_UUIDS_CNT: raise BluetoothSDPSocketError('Too many UUIDs') pattern = self._pack_uuids(uuids, preferred_size) + struct.pack( '>H', max_rec_cnt) cont_state = '\0' handles = [] while True: request = pattern + cont_state # Request without any continuation state is an example of invalid # request syntax. if invalid_request: request = pattern code, response = self.send_request_and_wait( SDP_SVC_SEARCH_REQ, request, forced_pdu_size) if code == SDP_ERROR_RSP: return self._unpack_error_code(response) if code != SDP_SVC_SEARCH_RSP: raise BluetoothSDPSocketError('Incorrect response code') cur_list, cont_state = self._unpack_uuids(response) handles.extend(cur_list) if cont_state == '\0': break return handles def _pack_attr_ids(self, attr_ids): """Pack a list of Attribute IDs to a binary sequence @param attr_ids: List of Attribute IDs. @return packed list as a str @raise BluetoothSDPSocketError: if list of UUIDs after packing is larger than or equal to 2^32 bytes """ attr_ids.sort() res = '' for attr_id in attr_ids: # Each element could be either a single Attribute ID or # a range of IDs. if isinstance(attr_id, list): packed_attr_id = struct.pack('>BHH', SDP_UINT32, attr_id[0], attr_id[1]) else: packed_attr_id = struct.pack('>BH', SDP_UINT16, attr_id) res += packed_attr_id res = self._pack_list(res) return res def _unpack_int(self, data, cnt): """Unpack an unsigned integer of cnt bytes @param data: raw data to be parsed @param cnt: size of integer @return unsigned integer """ res = 0 for i in range(cnt): res = (res << 8) | ord(data[i]) return res def _unpack_sdp_data_element(self, data): """Unpack a data element from a raw response @param data: raw data to be parsed @return tuple (result, scanned bytes) """ header, = struct.unpack_from('>B', data) data_type = header >> 3 data_size = header & 7 scanned = 1 data = data[1:] if data_type == 0: if data_size != 0: raise BluetoothSDPSocketError('Invalid size descriptor') return None, scanned elif data_type <= 3 or data_type == 5: if (data_size > 4 or data_type == 3 and (data_size == 0 or data_size == 3) or data_type == 5 and data_size != 0): raise BluetoothSDPSocketError('Invalid size descriptor') int_size = 1 << data_size res = self._unpack_int(data, int_size) # Consider negative integers. if data_type == 2 and (ord(data[0]) & 128) != 0: res = res - (1 << (int_size * 8)) # Consider booleans. if data_type == 5: res = res != 0 scanned += int_size return res, scanned elif data_type == 4 or data_type == 8: if data_size < 5 or data_size > 7: raise BluetoothSDPSocketError('Invalid size descriptor') int_size = 1 << (data_size - 5) str_size = self._unpack_int(data, int_size) res = data[int_size : int_size + str_size] scanned += int_size + str_size return res, scanned elif data_type == 6 or data_type == 7: if data_size < 5 or data_size > 7: raise BluetoothSDPSocketError('Invalid size descriptor') int_size = 1 << (data_size - 5) total_size = self._unpack_int(data, int_size) data = data[int_size:] scanned += int_size + total_size res = [] cur_size = 0 while cur_size < total_size: elem, elem_size = self._unpack_sdp_data_element(data) res.append(elem) data = data[elem_size:] cur_size += elem_size return res, scanned else: raise BluetoothSDPSocketError('Invalid size descriptor') def service_attribute_request(self, handle, max_attr_byte_count, attr_ids, forced_pdu_size=None, invalid_request=None): """Send a Service Attribute Request @param handle: service record from which attribute values are to be retrieved. @param max_attr_byte_count: maximum number of bytes of attribute data to be returned in the response to this request. @param attr_ids: a list, where each element is either an attribute ID or a range of attribute IDs. @param forced_pdu_size: Use certain PDU size parameter instead of calculating actual length of sequence. @param invalid_request: Whether to send request with intentionally invalid syntax for testing purposes (string with raw request). @return list of found attributes IDs and their values or Error Code @raise BluetoothSDPSocketError: arguments do not match the SDP restrictions or if the response has an incorrect code """ if max_attr_byte_count < 7 or max_attr_byte_count > 65535: raise BluetoothSDPSocketError('MaximumAttributeByteCount must be ' 'between 7 and 65535, inclusive') pattern = (struct.pack('>I', handle) + struct.pack('>H', max_attr_byte_count) + self._pack_attr_ids(attr_ids)) cont_state = '\0' complete_response = '' while True: request = (invalid_request if invalid_request else pattern + cont_state) code, response = self.send_request_and_wait( SDP_SVC_ATTR_REQ, request, forced_pdu_size) if code == SDP_ERROR_RSP: return self._unpack_error_code(response) if code != SDP_SVC_ATTR_RSP: raise BluetoothSDPSocketError('Incorrect response code') response_byte_count, = struct.unpack_from('>H', response) if response_byte_count > max_attr_byte_count: raise BluetoothSDPSocketError('AttributeListByteCount exceeds' 'MaximumAttributeByteCount') response = response[2:] complete_response += response[:response_byte_count] cont_state = response[response_byte_count:] if cont_state == '\0': break id_values_list = self._unpack_sdp_data_element(complete_response)[0] if len(id_values_list) % 2 == 1: raise BluetoothSDPSocketError('Length of returned list is odd') return id_values_list def service_search_attribute_request(self, uuids, max_attr_byte_count, attr_ids, preferred_size=32, forced_pdu_size=None, invalid_request=None): """Send a Service Search Attribute Request @param uuids: list of UUIDs (as integers) to look for. @param max_attr_byte_count: maximum number of bytes of attribute data to be returned in the response to this request. @param attr_ids: a list, where each element is either an attribute ID or a range of attribute IDs. @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128). @param forced_pdu_size: Use certain PDU size parameter instead of calculating actual length of sequence. @param invalid_request: Whether to send request with intentionally invalid syntax for testing purposes (string to be prepended to correct request). @return list of found attributes IDs and their values or Error Code @raise BluetoothSDPSocketError: arguments do not match the SDP restrictions or if the response has an incorrect code """ if len(uuids) > SDP_MAX_UUIDS_CNT: raise BluetoothSDPSocketError('Too many UUIDs') if max_attr_byte_count < 7 or max_attr_byte_count > 65535: raise BluetoothSDPSocketError('MaximumAttributeByteCount must be ' 'between 7 and 65535, inclusive') pattern = (self._pack_uuids(uuids, preferred_size) + struct.pack('>H', max_attr_byte_count) + self._pack_attr_ids(attr_ids)) cont_state = '\0' complete_response = '' while True: request = pattern + cont_state if invalid_request: request = invalid_request + request code, response = self.send_request_and_wait( SDP_SVC_SEARCH_ATTR_REQ, request, forced_pdu_size) if code == SDP_ERROR_RSP: return self._unpack_error_code(response) if code != SDP_SVC_SEARCH_ATTR_RSP: raise BluetoothSDPSocketError('Incorrect response code') response_byte_count, = struct.unpack_from('>H', response) if response_byte_count > max_attr_byte_count: raise BluetoothSDPSocketError('AttributeListByteCount exceeds' 'MaximumAttributeByteCount') response = response[2:] complete_response += response[:response_byte_count] cont_state = response[response_byte_count:] if cont_state == '\0': break id_values_list = self._unpack_sdp_data_element(complete_response)[0] return id_values_list