"""Test the various means of instantiating and invoking tools.""" import gzip, StringIO import time timeout = 0.2 import types from cherrypy.test import test test.prefer_parent_path() import cherrypy from cherrypy import tools europoundUnicode = u'\x80\xa3' def setup_server(): # Put check_access in a custom toolbox with its own namespace myauthtools = cherrypy._cptools.Toolbox("myauth") def check_access(default=False): if not getattr(cherrypy.request, "userid", default): raise cherrypy.HTTPError(401) myauthtools.check_access = cherrypy.Tool('before_request_body', check_access) def numerify(): def number_it(body): for chunk in body: for k, v in cherrypy.request.numerify_map: chunk = chunk.replace(k, v) yield chunk cherrypy.response.body = number_it(cherrypy.response.body) class NumTool(cherrypy.Tool): def _setup(self): def makemap(): m = self._merged_args().get("map", {}) cherrypy.request.numerify_map = m.items() cherrypy.request.hooks.attach('on_start_resource', makemap) def critical(): cherrypy.request.error_response = cherrypy.HTTPError(502).set_response critical.failsafe = True cherrypy.request.hooks.attach('on_start_resource', critical) cherrypy.request.hooks.attach(self._point, self.callable) tools.numerify = NumTool('before_finalize', numerify) # It's not mandatory to inherit from cherrypy.Tool. class NadsatTool: def __init__(self): self.ended = {} self._name = "nadsat" def nadsat(self): def nadsat_it_up(body): for chunk in body: chunk = chunk.replace("good", "horrorshow") chunk = chunk.replace("piece", "lomtick") yield chunk cherrypy.response.body = nadsat_it_up(cherrypy.response.body) nadsat.priority = 0 def cleanup(self): # This runs after the request has been completely written out. cherrypy.response.body = "razdrez" id = cherrypy.request.params.get("id") if id: self.ended[id] = True cleanup.failsafe = True def _setup(self): cherrypy.request.hooks.attach('before_finalize', self.nadsat) cherrypy.request.hooks.attach('on_end_request', self.cleanup) tools.nadsat = NadsatTool() def pipe_body(): cherrypy.request.process_request_body = False clen = int(cherrypy.request.headers['Content-Length']) cherrypy.request.body = cherrypy.request.rfile.read(clen) # Assert that we can use a callable object instead of a function. class Rotator(object): def __call__(self, scale): r = cherrypy.response r.collapse_body() r.body = [chr(ord(x) + scale) for x in r.body] cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator()) class Root: def index(self): return "Howdy earth!" index.exposed = True def euro(self): hooks = list(cherrypy.request.hooks['before_finalize']) hooks.sort() assert [x.callback.__name__ for x in hooks] == ['encode', 'gzip'] assert [x.priority for x in hooks] == [70, 80] yield u"Hello," yield u"world" yield europoundUnicode euro.exposed = True # Bare hooks def pipe(self): return cherrypy.request.body pipe.exposed = True pipe._cp_config = {'hooks.before_request_body': pipe_body} # Multiple decorators; include kwargs just for fun. # Note that encode must run before gzip. def decorated_euro(self, *vpath): yield u"Hello," yield u"world" yield europoundUnicode decorated_euro.exposed = True decorated_euro = tools.gzip(compress_level=6)(decorated_euro) decorated_euro = tools.encode(errors='ignore')(decorated_euro) root = Root() class TestType(type): """Metaclass which automatically exposes all functions in each subclass, and adds an instance of the subclass as an attribute of root. """ def __init__(cls, name, bases, dct): type.__init__(name, bases, dct) for value in dct.itervalues(): if isinstance(value, types.FunctionType): value.exposed = True setattr(root, name.lower(), cls()) class Test(object): __metaclass__ = TestType # METHOD ONE: # Declare Tools in _cp_config class Demo(Test): _cp_config = {"tools.nadsat.on": True} def index(self, id=None): return "A good piece of cherry pie" def ended(self, id): return repr(tools.nadsat.ended[id]) def err(self, id=None): raise ValueError() def errinstream(self, id=None): raise ValueError() yield "confidential" # METHOD TWO: decorator using Tool() # We support Python 2.3, but the @-deco syntax would look like this: # @tools.check_access() def restricted(self): return "Welcome!" restricted = myauthtools.check_access()(restricted) userid = restricted def err_in_onstart(self): return "success!" def stream(self, id=None): for x in xrange(100000000): yield str(x) stream._cp_config = {'response.stream': True} cherrypy.config.update({'environment': 'test_suite'}) conf = { # METHOD THREE: # Declare Tools in detached config '/demo': { 'tools.numerify.on': True, 'tools.numerify.map': {"pie": "3.14159"}, }, '/demo/restricted': { 'request.show_tracebacks': False, }, '/demo/userid': { 'request.show_tracebacks': False, 'myauth.check_access.default': True, }, '/demo/errinstream': { 'response.stream': True, }, '/demo/err_in_onstart': { # Because this isn't a dict, on_start_resource will error. 'tools.numerify.map': "pie->3.14159" }, # Combined tools '/euro': { 'tools.gzip.on': True, 'tools.encode.on': True, }, # Priority specified in config '/decorated_euro/subpath': { 'tools.gzip.priority': 10, }, } cherrypy.tree.mount(root, config=conf) # Client-side code # from cherrypy.test import helper class ToolTests(helper.CPWebCase): def testHookErrors(self): self.getPage("/demo/?id=1") # If body is "razdrez", then on_end_request is being called too early. self.assertBody("A horrorshow lomtick of cherry 3.14159") # If this fails, then on_end_request isn't being called at all. time.sleep(0.1) self.getPage("/demo/ended/1") self.assertBody("True") valerr = '\n raise ValueError()\nValueError' self.getPage("/demo/err?id=3") # If body is "razdrez", then on_end_request is being called too early. self.assertErrorPage(502, pattern=valerr) # If this fails, then on_end_request isn't being called at all. time.sleep(0.1) self.getPage("/demo/ended/3") self.assertBody("True") # If body is "razdrez", then on_end_request is being called too early. self.getPage("/demo/errinstream?id=5") # Because this error is raised after the response body has # started, the status should not change to an error status. self.assertStatus("200 OK") self.assertBody("Unrecoverable error in the server.") # If this fails, then on_end_request isn't being called at all. time.sleep(0.1) self.getPage("/demo/ended/5") self.assertBody("True") # Test the "__call__" technique (compile-time decorator). self.getPage("/demo/restricted") self.assertErrorPage(401) # Test compile-time decorator with kwargs from config. self.getPage("/demo/userid") self.assertBody("Welcome!") def testEndRequestOnDrop(self): old_timeout = None try: httpserver = cherrypy.server.httpservers.keys()[0] old_timeout = httpserver.timeout except (AttributeError, IndexError): print "skipped ", return try: httpserver.timeout = timeout # Test that on_end_request is called even if the client drops. self.persistent = True try: conn = self.HTTP_CONN conn.putrequest("GET", "/demo/stream?id=9", skip_host=True) conn.putheader("Host", self.HOST) conn.endheaders() # Skip the rest of the request and close the conn. This will # cause the server's active socket to error, which *should* # result in the request being aborted, and request.close being # called all the way up the stack (including WSGI middleware), # eventually calling our on_end_request hook. finally: self.persistent = False time.sleep(timeout * 2) # Test that the on_end_request hook was called. self.getPage("/demo/ended/9") self.assertBody("True") finally: if old_timeout is not None: httpserver.timeout = old_timeout def testGuaranteedHooks(self): # The 'critical' on_start_resource hook is 'failsafe' (guaranteed # to run even if there are failures in other on_start methods). # This is NOT true of the other hooks. # Here, we have set up a failure in NumerifyTool.numerify_map, # but our 'critical' hook should run and set the error to 502. self.getPage("/demo/err_in_onstart") self.assertErrorPage(502) self.assertInBody("AttributeError: 'str' object has no attribute 'items'") def testCombinedTools(self): expectedResult = (u"Hello,world" + europoundUnicode).encode('utf-8') zbuf = StringIO.StringIO() zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=9) zfile.write(expectedResult) zfile.close() self.getPage("/euro", headers=[("Accept-Encoding", "gzip"), ("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7")]) self.assertInBody(zbuf.getvalue()[:3]) zbuf = StringIO.StringIO() zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6) zfile.write(expectedResult) zfile.close() self.getPage("/decorated_euro", headers=[("Accept-Encoding", "gzip")]) self.assertInBody(zbuf.getvalue()[:3]) # This should break because gzip's priority was lowered in conf. # Of course, we don't want breakage in production apps, # but it proves the priority was changed. self.getPage("/decorated_euro/subpath", headers=[("Accept-Encoding", "gzip")]) self.assertErrorPage(500, pattern='UnicodeEncodeError') def testBareHooks(self): content = "bit of a pain in me gulliver" self.getPage("/pipe", headers=[("Content-Length", len(content)), ("Content-Type", "text/plain")], method="POST", body=content) self.assertBody(content) if __name__ == '__main__': setup_server() helper.testmain()