# Copyright 2015 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. r"""Rules parser. The input syntax is: [{"comment": ignored_value}, {"rule_class_name1": {"arg1": value, "arg2": value, ...}}, {"rule_class_name2": {"arg1": value, "arg2": value, ...}}, ...] E.g.: [{"comment": "this text is ignored"}, {"SendStatus": {"url": "example\\.com/ss.*", "status": 204}}, {"ModifyUrl": {"url": "(example\\.com)(/.*)", "new_url": "{1}"}} ] """ import json import re class Error(Exception): pass class Rules(object): """A parsed sequence of Rule objects.""" def __init__(self, file_obj=None, allowed_imports=None): """Initializes from the given file object. Args: file_obj: A file object. allowed_imports: A set of strings, defaults to {'rules'}. Use {'*'} to allow any import path. """ if allowed_imports is None: allowed_imports = {'rules'} self._rules = [] if file_obj is None else _Load(file_obj, allowed_imports) def Contains(self, rule_type_name): """Returns true if any rule matches the given type name. Args: rule_type_name: a string. Returns: True if any rule matches, else False. """ return any(rule for rule in self._rules if rule.IsType(rule_type_name)) def Find(self, rule_type_name): """Returns a _Rule object containing all rules with the given type name. Args: rule_type_name: a string. Returns: A callable object that expects two arguments: request: the httparchive ArchivedHttpRequest response: the httparchive ArchivedHttpResponse and returns the rule return_value of the first rule that returns should_stop == True, or the last rule's return_value if all rules returns should_stop == False. """ matches = [rule for rule in self._rules if rule.IsType(rule_type_name)] return _Rule(matches) def __str__(self): return _ToString(self._rules) def __repr__(self): return str(self) class _Rule(object): """Calls a sequence of Rule objects until one returns should_stop.""" def __init__(self, rules): self._rules = rules def __call__(self, request, response): """Calls the rules until one returns should_stop. Args: request: the httparchive ArchivedHttpRequest. response: the httparchive ArchivedHttpResponse, which may be None. Returns: The rule return_value of the first rule that returns should_stop == True, or the last rule's return_value if all rules return should_stop == False. """ return_value = None for rule in self._rules: should_stop, return_value = rule.ApplyRule( return_value, request, response) if should_stop: break return return_value def __str__(self): return _ToString(self._rules) def __repr__(self): return str(self) def _ToString(rules): """Formats a sequence of Rule objects into a string.""" return '[\n%s\n]' % '\n'.join('%s' % rule for rule in rules) def _Load(file_obj, allowed_imports): """Parses and evaluates all rules in the given file. Args: file_obj: a file object. allowed_imports: a sequence of strings, e.g.: {'rules'}. Returns: a list of rules. """ rules = [] entries = json.load(file_obj) if not isinstance(entries, list): raise Error('Expecting a list, not %s', type(entries)) for i, entry in enumerate(entries): if not isinstance(entry, dict): raise Error('%s: Expecting a dict, not %s', i, type(entry)) if len(entry) != 1: raise Error('%s: Expecting 1 item, not %d', i, len(entry)) name, args = next(entry.iteritems()) if not isinstance(name, basestring): raise Error('%s: Expecting a string TYPE, not %s', i, type(name)) if not re.match(r'(\w+\.)*\w+$', name): raise Error('%s: Expecting a classname TYPE, not %s', i, name) if name == 'comment': continue if not isinstance(args, dict): raise Error('%s: Expecting a dict ARGS, not %s', i, type(args)) fullname = str(name) if '.' not in fullname: fullname = 'rules.%s' % fullname modulename, classname = fullname.rsplit('.', 1) if '*' not in allowed_imports and modulename not in allowed_imports: raise Error('%s: Package %r is not in allowed_imports', i, modulename) module = __import__(modulename, fromlist=[classname]) clazz = getattr(module, classname) missing = {s for s in ('IsType', 'ApplyRule') if not hasattr(clazz, s)} if missing: raise Error('%s: %s lacks %s', i, clazz.__name__, ' and '.join(missing)) rule = clazz(**args) rules.append(rule) return rules