Serving HTTP with continuations, or how to make Twisted and Stackless Python play nice
In my neverending quest for The One True Way To Serve Web Applications I've recently become interested in the concept of continuations and how they can make single-process, asynchronous programming as easy and intuitive as simple blocking calls. A recent example is NeverBlock, built on Ruby 1.9 fibers.
Getting really close: plain Python >=2.5 and Twisted
Having already some experience with Twisted I decided to try to (ab)use Python 2.5 generators to implement continuations. The basic idea was to wrap the request logic inside a generator, and have it yield out the Twisted referred whenever it made any I/O operation: (all the boilerplate Twisted code has been left out)
def render_GET(self, request):
self.continuation = self.handle(request)
# self.continuation now contains the generator object
# advance it to at least the first yield
df = self.continuation.next()
# resume the generator on the df callback
df.addCallback(self.resumer)
return server.NOT_DONE_YET
def handle(self, request):
result = yield dbPool.runQuery('SELECT * FROM test')
request.write(result)
def resumer(self, result):
self.continuation.send(result)
This worked fine. It was also useless for any practical purpose since generators only work from inside a single call frame, so it was impossible to add any complex code to the handle() method. Any and all request processing that potentially made any I/O operation (and thus yielded the deferred) would have need to reside directly in the body of the method.
This is where anybody else would just use threads and call it a day. Not me of course. I wanted to follow the Twisted philosophy of never blocking and handling thousands of request on a single process/thread, but deferreds force the programmer to use callbacks and chop up your logic in many methods.
Enter Stackless Python
Stackless Python is a partial reimplentation of Python that adds a continuation library to the language. Its principle is very simple: you can have any arbitrary amount of in-process, very lightweight, cooperative threads, called tasklets. In the main tasklet (the one your app starts on) you call stackless.run() as the main loop of your app. And inside each tasklet you call stackless.schedule() when you want to give up the CPU to the runloop. A simple inter-tasklet communication primitive rounds up the library (stackless.channel).
My idea then was to create a new tasklet for every request Twisted received, and have them to be scheduled() away when they found themselves waiting on the completion of a deferred. The deferred callback would restart the tasklet, somehow. It turns out there was already an entire collection of examples combining Twisted and Stackless in Google Code.
In particular the TwistedWebserverThreaded.py example was basically what I wanted, but it highlighted a problem: both Stackless and Twisted make use of a runloop, and both of them assume they are the runloop. The example made use of a separate thread to run the Twisted runloop. I decided there was a better way, if giving up on a Stackless feature was an option.
Giving up on preemption
Stackless supports preemption by the way of allowing its runloop to be called for a certain amount of bytecodes. This allows it to reschedule a different tasklet, but it also introduces all the annoying problems associated with preemptive code, without any of the benefits (no true multicore execution). I was more than happy to give up on it since all I wanted was continuations.
And so I had an idea: make the Twisted runloop run for as long as possible, but make sure it always calls schedule() on the completion of any I/O. Since all I asked from Stackless was multiplexing I/O in a single process, and not threading emulation, this was perfectly fine for me.
Example
I've implemented an example on how to integrate Twisted and Stackless in a HTTP server. Every request spawns a tasklet, and it waits on a channel whenever it has to wait for the completion of a deferred. The callback of the deferred will signal that channel for completion and send the deferred result. Since the callback execution happens in the context of the Twisted runloop tasklet it gives us the opportunity to call schedule() inside it, thus giving the request tasklets the chance to run again.
In the example I present a nonsensical request processor that does 3 asynchronous SQL requests, and from time to time 1 HTTP request and a very simple inter-tasklet communication in the form of a very lousy chat.
Base controller
class ResumableController():
def tasklet(self):
self.return_channel = NWChannel()
self.me = stackless.getcurrent()
self.stamp = random.randint(0, 1000000000)
self.handle()
The chat receiver is just a blocking call into a Stackless channel.
# chat methods
def waitForChat(self):
return chatChannel.receive()
The database uses the standard Twisted Enterprise asynchronous database pool.
# db methods
def waitForSQL(self, sql):
d = dbPool.runQuery(sql)
return self.waitForDeferred(d, self.succesfulSQLDeferred)
def succesfulSQLDeferred(self, result):
r = pickle.dumps(result)
self.return_channel.send_nowait(r)
self.reschedule()
The HTTP requester uses the Twisted HTTP client.
# http client methods
def waitForHTTPClient(self, url):
d = client.getPage(url)
return self.waitForDeferred(d, self.succesfulHTTPDeferred)
def succesfulHTTPDeferred(self, result):
r = html.PRE(result)
self.return_channel.send_nowait(r)
self.reschedule()
Common methods, including the common handling for deferreds.
def waitForDeferred(self, d, success):
d.addCallback(success)
d.addErrback(self.errorDeferred)
return self.waitForChannel()
def waitForChannel(self):
return self.return_channel.receive()
def errorDeferred(self, fault):
self.return_channel.send_exception_nowait(fault.type, fault.value)
self.reschedule()
def reschedule(self):
if stackless.getcurrent() != self.me:
stackless.schedule()
Twisted resource
The reactor.callLater(0.0, stackless.schedule) is the equivalent of the deferreds giving up the control to the Stackless runloop, only in a different way since we need to return first from the render_GET method.
class ClientRequestHandler(resource.Resource):
isLeaf = True
def __init__(self):
resource.Resource.__init__(self)
def render_GET(self, request):
request.write('request arrives<br>')
c = ExampleController(request)
stackless.tasklet(c.tasklet)()
request.write('still in the reactor tasklet<br>')
reactor.callLater(0.0, stackless.schedule)
return server.NOT_DONE_YET
Example controller
handle is where all the fun is. All the wait* methods block, but they also inmediately give up the control to the Stackless runloop, which ultimately gives control to the Twisted runloop. When a deferred calls back the control is given up again from Twisted to Stackless, which wakes up the request tasklet. The result is a working continuation based HTTP server in Python, with full I/O-based scheduling!
class ExampleController(ResumableController):
def __init__(self, request):
self.request = request
def handle(self):
self.request.write('hi, we are now inside the request tasklet<br>')
# replace this query with something valid for your DB
sql = 'select * from test limit 10'
self.request.write('<br><br>QUERY 1:')
self.request.write(self.waitForSQL(sql))
if (random.randint(0, 9) <= 1):
self.request.write('<br><br>HTTP:')
self.request.write(self.waitForHTTPClient('http://www.google.com/'))
self.request.write('<br><br>QUERY 2:')
self.request.write(self.waitForSQL(sql))
if (random.randint(0, 9) <= 1):
self.request.write('<br><br>CHAT PRODUCER:')
self.request.write('sending to {0:d} clients'.format(chatChannel.balance))
chatChannel.send('hi from {0:d} (and to other {1:d} chaps)'.format(self.stamp, chatChannel.balance))
if (random.randint(0, 9) <= 2):
self.request.write('<br><br>CHAT CONSUMER:')
self.request.write(self.waitForChat())
self.request.write('<br><br>QUERY 3:')
self.request.write(self.waitForSQL(sql))
self.request.finish()
Download
You can download the full example from here: server-twisted-stackless.zip. If you don't have a database on hand just comment out the pool creation and replace the database calls with Google.com fetches for example. If some requests appear to hang it just means they are waiting for chat messages from other requests. This is expected since it tries to emulate the behavior of Comet applications. Comment out the waitForChat call if it annoys you.
The TicketStumbler guys write on switching their spider to Twisted and learning from the experience:
Twisted is Event Driven and Asychronous meaning that things don’t necessarily happen in sequential order and code should be written to respond to events such as “the data is ready” or “his ass is grass.” Despite knowledge of over a dozen programming languages and familiarity with using threads and processes to run sequential code blocks beside one another, this was a very new concept to me and one I initially found difficult to grasp enough to write code for it (though using it was quite straight forward).
Simple HTTP PubSub server with Twisted
Jabber and its XMPP protocol pioneered the basic idea behind most contemporary HTTP push implementations. They call it BOSH:
The technique employed by this protocol achieves both low latency and low bandwidth consumption by encouraging the connection manager not to respond to a request until it actually has data to send to the client. As soon as the client receives a response from the connection manager it sends another request, thereby ensuring that the connection manager is (almost) always holding a request that it can use to "push" data to the client.
Basically it goes like this:
- The client makes a HTTP request to the server
- The server does not send any response
- When the server has asynchronous data to send to the client, it looks for and open, pending connection from step 2, and sends the data as the request response
- The client receives the data and immediately starts another request like in step 1
This is not the task for Apache
The traditional fork-and-serve model for HTTP servers does not work with the BOSH pattern. You can force it into submission by using shared memory or switching to threads with critical sections. But in the end you are using a process and/or a thread for every push client connection. And those connections are going to be open and silent for 5 minutes, tying up with them the entire child process that was forked to handle them. In those same 5 minutes that thread or process could have handled thousands of normal, short-lived requests.
The right tool for the job
Single-process, single-threaded event servers are all on the rage nowadays. They offer a processing model based on simultaneously handling thousands of request in a single process, using the very efficient asynchronous I/O primitives that most current operating systems offer.
But a BOSH-like server has to go a step beyond. The actual logic of the server, not just its socket plumbing, has to be asynchronous and event-based. For this reason you just cannot put a FCGI PHP spawner behind a nginx frontend and expect to scale beyond 10 push clients. You won't DoS the nginx process with your push connections, but the FCGI processes are sill running under the 1 connection = 1 process paradigm.
For this reason I decided to do my own server, helped by the many asynchronous programming libraries and frameworks available. I discarded a C, libevent based implementation early on an went on to look into EventMachine. It's a very focused and lean Ruby framework implemented on top of libevent, and it offers standard servers and clients for some protocols, like HTTP. In the end I went with Twisted. I've wanted to try Python for a long time and Twisted looks like a very stable and well tested framework.
The PubSub paradigm
For this small server I used a very simple model for the push server. The server is independent from the application model and it only concerns itself with a single task: maintaining a list of open client requests, sorted by a channel ID. This ID is opaque to the push server. Clients ask the push server to be put on the waiting list of a certain channel, identified by its ID. And the application server asks the push server to wake up the waiting clients of a certain channel, identified by its ID.
This is a simple interpretation of the Publish/Subscribe paradigm, and in this implementation the server is not going to care about queuing messages, complex grouping of clients, and all the other features it would need to be a robust, generic PubSub system like Bayeux.
Implementation
The server has 2 open ports:
- Port 8080: open to the internet. This is the port clients connect to subscribe and wait for a channel notification.
- Port 9100: open only to localhost. This is the port that the application server, which is implemented in Apache + PHP, connects to deliver a notification to a channel. Connections to this port are fast and short-lived.
For testing and developing the server I modified Peak Notes to give it a Comet-like behavior. Peak Notes is already a full Ajax application, with a full client model, so it was just a matter of requesting a new snapshot of the current note board when a notification arrived. A more intelligent and efficient implementation would return the data that actually changed in the notification request.
The PHP application server code was very easily modified: every operation that changed the database in any way also sent a notification to the appropriate channel, by using the notification server on port 9100. Channels ID are just a salted MD5 of the user ID, and this ID is also transferred to the client during login. A shared note board would need some other naming scheme for the channel IDs but those are currently unsupported.
The client resources, code and Ajax requests are all served from notes.olivepeak.com on port 80, but the PubSub server was on port 8080. This posed an interesting cross-domain problem typical in Javascript. It was solved by using a JSONP wrapper for the push connections.
simple-pubsub.py
This is the Twisted HTTP resource that handles client subscriptions. On an incoming GET it checks for a fixed path in the URI, /subscriptions/channel/, and tries to extract the channel ID from it. On success it adds the request to the channel list and returns server.NOT_DONE_YET. This tells Twisted to not return anything to the client, but leave the connection open for later processing.
class ClientRequester(resource.Resource):
isLeaf = True
def __init__(self, i):
resource.Resource.__init__(self)
self.internal = i
def render_GET(self, request):
channel = getChannelID('/subscriptions/channel/', request.uri)
if (channel == False):
return '{ "status" : "error" }'
self.internal.addClientDelegate(channel, ClientDelegate(request))
return server.NOT_DONE_YET
ClientDelegate wraps the client connection and has the methods for sending back the notification and closing the connection. It also wraps the notification JSON in the client-provided callback function.
class ClientDelegate:
def __init__(self, r):
self.request = r
# JSONP stuff, check errors bla bla bla
self.callbackName = r.args['callback'][0]
def end(self, status, revision):
try:
rev = ''
if revision != False:
rev = ', "revision" : "' + revision + '"'
self.request.write(self.callbackName + '({ "status" : "' + status + '"' + rev + '})')
self.request.finish()
except:
print "Client lost patience"
def notify(self, revision):
self.end('changed', revision)
NotificationRequester is the Twisted HTTP resource that handles the application server notification requests. It also expects a channel ID after the /notifications/channel/ part of the URI. There's also support for sending a revision ID to the clients. This is an extra bit of information specific to Peak Notes, and it is used for optimizing and skipping some model synchronizations. A more featured, fine-grained delivery would be better.
class NotificationRequester(resource.Resource):
isLeaf = True
def __init__(self):
resource.Resource.__init__(self)
self.clients = { }
def render_GET(self, request):
channel = getChannelID('/notifications/channel/', request.uri)
revision = request.args['revision'][0]
if (channel == False):
return '{ "status" : "error" }'
print "Waking up clients on channel " + channel
if (channel in self.clients):
oldL = self.clients[channel]
self.clients[channel] = [ ]
for client in oldL:
client.notify(revision)
return '{ "status" : "ok" }'
def addClientDelegate(self, channel, delegate):
if (channel in self.clients):
self.clients[channel].append(delegate)
else:
self.clients[channel] = [ delegate ]
print "Registered new client into channel " + channel
print "Current clients:"
print self.clients
You can find the full source of the server here. There is some additional code to handle server-side cleanups of client timeouts and the twistd initialization code. To run it invoke twistd as twistd -noy simple-pubsub.py. Simple testing on localhost with a browser is possible. For example, open a browser window with the following URL:
http://localhost:8080/subscriptions/channel/9cdfb439c7876e703e307864c9167a15?callback=test
The browser will try to load it, awaiting for data to arrive. Then in a second browser window open:
http://localhost:9100/notifications/channel/9cdfb439c7876e703e307864c9167a15?revision=100
This request will be immediately served, and the first one will finish its loading and show a Javascript snipet: test({ "status" : "changed", "revision" : "100"})
Update: I've moved the Comet bits to the public Peak Notes server.
