from autotest_lib.frontend.shared import exceptions class ConstraintError(Exception): """Raised when an error occurs applying a Constraint.""" class QueryProcessor(object): def __init__(self): # maps selector name to (selector, constraint) self._selectors = {} self._alias_counter = 0 def add_field_selector(self, name, field=None, value_transform=None, doc=None): if not field: field = name self.add_selector(Selector(name, doc), _FieldConstraint(field, value_transform)) def add_related_existence_selector(self, name, model, field, doc=None): self.add_selector( Selector(name, doc), _RelatedExistenceConstraint(model, field, make_alias_fn=self.make_alias)) def add_keyval_selector(self, name, model, key_field, value_field, doc=None): self.add_selector( Selector(name, doc), _KeyvalConstraint(model, key_field, value_field, make_alias_fn=self.make_alias)) def add_selector(self, selector, constraint): if self._selectors is None: self._selectors = {} self._selectors[selector.name] = (selector, constraint) def make_alias(self): self._alias_counter += 1 return 'alias%s' % self._alias_counter def selectors(self): return tuple(selector for selector, constraint in self._selectors.itervalues()) def has_selector(self, selector_name): return selector_name in self._selectors def apply_selector(self, queryset, selector_name, value, comparison_type=None, is_inverse=False): if comparison_type is None: comparison_type = 'equals' _, constraint = self._selectors[selector_name] try: return constraint.apply_constraint(queryset, value, comparison_type, is_inverse) except ConstraintError, exc: raise exceptions.BadRequest('Selector %s: %s' % (selector_name, exc)) # common value conversions def read_boolean(self, boolean_input): if boolean_input.lower() == 'true': return True if boolean_input.lower() == 'false': return False raise exceptions.BadRequest('Invalid input for boolean: %r' % boolean_input) class Selector(object): def __init__(self, name, doc): self.name = name self.doc = doc class Constraint(object): def apply_constraint(self, queryset, value, comparison_type, is_inverse): raise NotImplementedError class _FieldConstraint(Constraint): def __init__(self, field, value_transform=None): self._field = field self._value_transform = value_transform _COMPARISON_MAP = { 'equals': 'exact', 'lt': 'lt', 'le': 'lte', 'gt': 'gt', 'ge': 'gte', 'contains': 'contains', 'startswith': 'startswith', 'endswith': 'endswith', 'in': 'in', } def apply_constraint(self, queryset, value, comparison_type, is_inverse): if self._value_transform: value = self._value_transform(value) kwarg_name = str(self._field + '__' + self._COMPARISON_MAP[comparison_type]) if comparison_type == 'in': value = value.split(',') if is_inverse: return queryset.exclude(**{kwarg_name: value}) else: return queryset.filter(**{kwarg_name: value}) class _RelatedExistenceConstraint(Constraint): def __init__(self, model, field, make_alias_fn): self._model = model self._field = field self._make_alias_fn = make_alias_fn def apply_constraint(self, queryset, value, comparison_type, is_inverse): if comparison_type not in (None, 'equals'): raise ConstraintError('Can only use equals or not equals with ' 'this selector') related_query = self._model.objects.filter(**{self._field: value}) if not related_query: raise ConstraintError('%s %s not found' % (self._model_name, value)) alias = self._make_alias_fn() queryset = queryset.model.objects.join_custom_field(queryset, related_query, alias) if is_inverse: condition = '%s.%s IS NULL' else: condition = '%s.%s IS NOT NULL' condition %= (alias, queryset.model.objects.key_on_joined_table(related_query)) queryset = queryset.model.objects.add_where(queryset, condition) return queryset class _KeyvalConstraint(Constraint): def __init__(self, model, key_field, value_field, make_alias_fn): self._model = model self._key_field = key_field self._value_field = value_field self._make_alias_fn = make_alias_fn def apply_constraint(self, queryset, value, comparison_type, is_inverse): if comparison_type not in (None, 'equals'): raise ConstraintError('Can only use equals or not equals with ' 'this selector') if '=' not in value: raise ConstraintError('You must specify a key=value pair for this ' 'selector') key, actual_value = value.split('=', 1) related_query = self._model.objects.filter( **{self._key_field: key, self._value_field: actual_value}) alias = self._make_alias_fn() queryset = queryset.model.objects.join_custom_field(queryset, related_query, alias) if is_inverse: condition = '%s.%s IS NULL' else: condition = '%s.%s IS NOT NULL' condition %= (alias, queryset.model.objects.key_on_joined_table(related_query)) queryset = queryset.model.objects.add_where(queryset, condition) return queryset