import unittest import os import time from re import compile as reCompile from threading import Thread from Queue import Queue, Empty from urllib import urlopen try: # for Python < 2.3 True, False except NameError: True, False = 1, 0 class AppServerTest(unittest.TestCase): def setUp(self): """Setup fixture and test starting.""" workdir = self.workDir() dirname = os.path.dirname webwaredir = dirname(dirname(dirname(workdir))) launch = os.path.join(workdir, 'Launch.py') cmd = "python %s --webware-dir=%s --work-dir=%s" \ " ThreadedAppServer" % (launch, webwaredir, workdir) self._output = os.popen(cmd) # Setting the appserver output to non-blocking mode # could be done as follows on Unix systems: # fcntl.fcntl(self._output.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # But, since this does not work on Windows systems, # we will pull the output in a separate thread: def pullStream(stream, queue): while stream: line = stream.readline() if not line: break queue.put(line) self._queue = Queue() self._thread = Thread(target=pullStream, args=(self._output, self._queue)) self._thread.start() self.assertAppServerSays('^WebKit AppServer ') self.assertAppServerSays(' Webware for Python.$') self.assertAppServerSays(' by Chuck Esterbrook.') self.assertAppServerSays('^WebKit and Webware are open source.$') self.assertAppServerSays('^EnableHTTP\s*=\s*(True|1)$') self.assertAppServerSays('^HTTPPort\s*=\s*8080$') self.assertAppServerSays('^Host\s*=\s*(localhost|127.0.0.1)$') self.assertAppServerSays('^Ready (.*).$') # We will also test the built-in HTTP server with this: try: data = urlopen('http://localhost:8080').read() except IOError: data = '