# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This file defines helper functions for putting entries into elasticsearch.
"""Utils for sending metadata to elasticsearch
Elasticsearch is a key-value store NOSQL database.
Source is here: https://github.com/elasticsearch/elasticsearch
We will be using es to store our metadata.
For example, if we wanted to store the following metadata:
metadata = {
'host_id': 1
'job_id': 20
'time_start': 100000
'time_recorded': 100006
}
The following call will send metadata to the default es server.
es_utils.ESMetadata().post(index, metadata)
We can also specify which port and host to use.
Using for testing: Sometimes, when we choose a single index
to put entries into, we want to clear that index of all
entries before running our tests. Use clear_index function.
(see es_utils_functionaltest.py for an example)
This file also contains methods for sending queries to es. Currently,
the query (json dict) we send to es is quite complicated (but flexible).
We've included several methods that composes queries that would be useful.
These methods are all named create_*_query()
For example, the below query returns job_id, host_id, and job_start
for all job_ids in [0, 99999] and host_id matching 10.
range_eq_query = {
'fields': ['job_id', 'host_id', 'job_start'],
'query': {
'filtered': {
'query': {
'match': {
'host_id': 10,
}
}
'filter': {
'range': {
'job_id': {
'gte': 0,
'lte': 99999,
}
}
}
}
}
}
To send a query once it is created, call execute_query() to send it to the
intended elasticsearch server.
"""
import collections
import json
import logging
import socket
import time
try:
import elasticsearch
from elasticsearch import helpers as elasticsearch_helpers
except ImportError:
logging.debug('Failed to import elasticsearch. Mock classes will be used '
'and calls to Elasticsearch server will be no-op. Test run '
'is not affected by the missing elasticsearch module.')
import elasticsearch_mock as elasticsearch
elasticsearch_helpers = elasticsearch.Elasticsearch()
# Global timeout for connection to esdb timeout.
DEFAULT_TIMEOUT = 30
# Default result size for a query.
DEFAULT_RESULT_SIZE = 10**4
# Default result size when scrolling query results.
DEFAULT_SCROLL_SIZE = 5*10**4
class EsUtilException(Exception):
"""Exception raised when functions here fail. """
pass
QueryResult = collections.namedtuple('QueryResult', ['total', 'hits'])
class ESMetadata(object):
"""Class handling es connection for metadata."""
@property
def es(self):
"""Read only property, lazily initialized"""
if not self._es:
self._es = elasticsearch.Elasticsearch(host=self.host,
port=self.port,
timeout=self.timeout)
return self._es
def __init__(self, use_http, host, port, index, udp_port,
timeout=DEFAULT_TIMEOUT):
"""Initialize ESMetadata object.
@param use_http: Whether to send data to ES using HTTP.
@param host: Elasticsearch host.
@param port: Elasticsearch port.
@param index: What index the metadata is stored in.
@param udp_port: What port to use for UDP data.
@param timeout: How long to wait while connecting to es.
"""
self.use_http = use_http
self.host = host
self.port = port
self.index = index
self.udp_port = udp_port
self.timeout = timeout
self._es = None
def _send_data_http(self, type_str, metadata):
"""Sends data to insert into elasticsearch using HTTP.
@param type_str: sets the _type field in elasticsearch db.
@param metadata: dictionary object containing metadata
"""
try:
self.es.index(index=self.index, doc_type=type_str, body=metadata)
except elasticsearch.ElasticsearchException as e:
# Mute exceptions from metadata reporting to prevent meta data
# reporting errors from killing test.
logging.error(e)
def _send_data_udp(self, type_str, metadata):
"""Sends data to insert into elasticsearch using UDP.
@param type_str: sets the _type field in elasticsearch db.
@param metadata: dictionary object containing metadata
"""
try:
# Header.
message = json.dumps(
{'index': {'_index': self.index, '_type': type_str}},
separators=(', ', ' : '))
message += '\n'
# Metadata.
message += json.dumps(metadata, separators=(', ', ' : '))
message += '\n'
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(message, (self.host, self.udp_port))
except socket.error as e:
logging.warn(e)
def post(self, type_str, metadata, log_time_recorded=True, **kwargs):
"""Wraps call of send_data, inserts entry into elasticsearch.
@param type_str: Sets the _type field in elasticsearch db.
@param metadata: Dictionary object containing metadata
@param log_time_recorded: Whether to automatically record the time
this metadata is recorded. Default is True.
@param kwargs: Additional metadata fields
@return: True if post action succeeded. Otherwise return False.
"""
if not metadata:
return True
metadata = metadata.copy()
metadata.update(kwargs)
# metadata should not contain anything with key '_type'
if '_type' in metadata:
type_str = metadata['_type']
del metadata['_type']
if log_time_recorded:
metadata['time_recorded'] = time.time()
try:
if self.use_http:
self._send_data_http(type_str, metadata)
else:
self._send_data_udp(type_str, metadata)
return True
except elasticsearch.ElasticsearchException as e:
logging.error(e)
return False
def bulk_post(self, data_list, log_time_recorded=True, **kwargs):
"""Wraps call of send_data, inserts entry into elasticsearch.
@param data_list: A list of dictionary objects containing metadata.
@param log_time_recorded: Whether to automatically record the time
this metadata is recorded. Default is True.
@param kwargs: Additional metadata fields
@return: True if post action succeeded. Otherwise return False.
"""
if not data_list:
return True
actions = []
for metadata in data_list:
metadata = metadata.copy()
metadata.update(kwargs)
if log_time_recorded and not 'time_recorded' in metadata:
metadata['time_recorded'] = time.time()
metadata['_index'] = self.index
actions.append(metadata)
try:
elasticsearch_helpers.bulk(self.es, actions)
return True
except elasticsearch.ElasticsearchException as e:
logging.error(e)
return False
def _compose_query(self, equality_constraints=[], fields_returned=None,
range_constraints=[], size=DEFAULT_RESULT_SIZE,
sort_specs=None, regex_constraints=[],
batch_constraints=[]):
"""Creates a dict. representing multple range and/or equality queries.
Example input:
_compose_query(
fields_returned = ['time_recorded', 'hostname',
'status', 'dbg_str'],
equality_constraints = [
('_type', 'host_history'),
('hostname', '172.22.169.106'),
],
range_constraints = [
('time_recorded', 1405628341.904379, 1405700341.904379)
],
size=20,
sort_specs=[
'hostname',
{'time_recorded': 'asc'},
]
)
Output:
{
'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'],
'query': {
'bool': {
'minimum_should_match': 3,
'should': [
{
'term': {
'_type': 'host_history'
}
},
{
'term': {
'hostname': '172.22.169.106'
}
},
{
'range': {
'time_recorded': {
'gte': 1405628341.904379,
'lte': 1405700341.904379
}
}
}
]
},
},
'size': 20
'sort': [
'hostname',
{ 'time_recorded': 'asc'},
]
}
@param equality_constraints: list of tuples of (field, value) pairs
representing what each field should equal to in the query.
e.g. [ ('field1', 1), ('field2', 'value') ]
@param fields_returned: list of fields that we should return when
the query is executed. Set it to None to return all fields. Note
that the key/vals will be stored in _source key of the hit object,
if fields_returned is set to None.
@param range_constraints: list of tuples of (field, low, high) pairs
representing what each field should be between (inclusive).
e.g. [ ('field1', 2, 10), ('field2', -1, 20) ]
If you want one side to be unbounded, you can use None.
e.g. [ ('field1', 2, None) ] means value of field1 >= 2.
@param size: max number of entries to return. Default is 100000.
@param sort_specs: A list of fields to sort on, tiebreakers will be
broken by the next field(s).
@param regex_constraints: A list of regex constraints of tuples of
(field, value) pairs, e.g., [('filed1', '.*value.*')].
@param batch_constraints: list of tuples of (field, list) pairs
representing each field should be equal to one of the values
in the list.
e.g., [ ('job_id', [10, 11, 12, 13]) ]
@returns: dictionary object that represents query to es.
This will return None if there are no equality constraints
and no range constraints.
"""
if not equality_constraints and not range_constraints:
raise EsUtilException('No range or equality constraints specified.')
# Creates list of range dictionaries to put in the 'should' list.
range_list = []
if range_constraints:
for key, low, high in range_constraints:
if low is None and high is None:
continue
temp_dict = {}
if low is not None:
temp_dict['gte'] = low
if high is not None:
temp_dict['lte'] = high
range_list.append( {'range': {key: temp_dict}})
# Creates the list of term dictionaries to put in the 'should' list.
eq_list = [{'term': {k: v}} for k, v in equality_constraints if k]
batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k]
regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k]
constraints = eq_list + batch_list + range_list + regex_list
query = {
'query': {
'bool': {
'must': constraints,
}
},
}
if fields_returned:
query['fields'] = fields_returned
query['size'] = size
if sort_specs:
query['sort'] = sort_specs
return query
def execute_query(self, query):
"""Makes a query on the given index.
@param query: query dictionary (see _compose_query)
@returns: A QueryResult instance describing the result.
Example output:
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 16,
"successful" : 16,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 1.0,
"hits" : [ {
"_index" : "graphite_metrics2",
"_type" : "metric",
"_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss",
"_score" : 1.0,
"_source":{"target_type": "timer",
"host_id": 1,
"job_id": 22,
"time_start": 400}
}, {
"_index" : "graphite_metrics2",
"_type" : "metric",
"_id" : "dfgfddddddddddddddddddddddhhh",
"_score" : 1.0,
"_source":{"target_type": "timer",
"host_id": 2,
"job_id": 23,
"time_start": 405}
}, {
"_index" : "graphite_metrics2",
"_type" : "metric",
"_id" : "erwerwerwewtrewgfednvfngfngfrhfd",
"_score" : 1.0,
"_source":{"target_type": "timer",
"host_id": 3,
"job_id": 24,
"time_start": 4098}
}, {
"_index" : "graphite_metrics2",
"_type" : "metric",
"_id" : "dfherjgwetfrsupbretowegoegheorgsa",
"_score" : 1.0,
"_source":{"target_type": "timer",
"host_id": 22,
"job_id": 25,
"time_start": 4200}
} ]
}
}
"""
if not self.es.indices.exists(index=self.index):
logging.error('Index (%s) does not exist on %s:%s',
self.index, self.host, self.port)
return None
result = self.es.search(index=self.index, body=query)
# Check if all matched records are returned. It could be the size is
# set too small. Special case for size set to 1, as that means that
# the query cares about the first matched entry.
# TODO: Use pagination in Elasticsearch. This needs major change on how
# query results are iterated.
size = query.get('size', 1)
need_scroll = 'size' in query and size == DEFAULT_RESULT_SIZE
return_count = len(result['hits']['hits'])
total_match = result['hits']['total']
if total_match > return_count and need_scroll:
logging.warn('There are %d matched records, only %d entries are '
'returned. Query size is set to %d. Will try to use '
'scroll command to get all entries.', total_match,
return_count, size)
# Try to get all results with scroll.
hits = self._get_results_by_scan(query, total_match)
else:
hits = result['hits']['hits']
# Extract the actual results from the query.
output = QueryResult(total_match, [])
for hit in hits:
converted = {}
if 'fields' in hit:
for key, value in hit['fields'].items():
converted[key] = value[0] if len(value)==1 else value
else:
converted = hit['_source'].copy()
output.hits.append(converted)
return output
def _get_results_by_scan(self, query, total_match=None):
"""Get all results by using scan.
@param query: query dictionary (see _compose_query)
@param total_match: The number of total matched results. Pass the value
in so the code doesn't need to run another query to get it.
@returns: A list of matched results.
"""
if True or not total_match:
# Reduce the return size to make the query run faster.
query['size'] = 1
result = self.es.search(index=self.index, body=query)
total_match = result['hits']['total']
# Remove the sort from query so scroll method can run faster.
sort = None
if 'sort' in query:
sort = query['sort']
if len(sort) > 1:
raise EsUtilException('_get_results_by_scan does not support '
'sort with more than one key: %s', sort)
del query['sort']
del query['size']
scroll = elasticsearch_helpers.scan(self.es, query=query,
index=self.index,
size=DEFAULT_SCROLL_SIZE)
hits = []
next_mark = 0
for hit in scroll:
hits.append(hit)
downloaded_percent = 100 * float(len(hits))/total_match
if downloaded_percent > next_mark:
logging.debug('%2.0f%% downloaded (%d)', downloaded_percent,
len(hits))
next_mark += 5
logging.debug('Number of hits found: %s', len(hits))
if sort:
logging.debug('Sort hits with rule: %s', sort)
sort_key = sort[0].keys()[0]
is_desc = sort[0].values()[0] == 'desc'
# If the query has `fields` specified, the dict of hit stores value
# in hit['fields'], otherwise, the keyvals are stored in
# hit['_source'].
key = lambda hit:(hit['_source'][sort_key] if '_source' in hit else
hit['fields'][sort_key][0])
hits = sorted(hits, key=key, reverse=is_desc)
return hits
def query(self, *args, **kwargs):
"""The arguments to this function are the same as _compose_query."""
query = self._compose_query(*args, **kwargs)
return self.execute_query(query)