Haxin Mainframes

A blog about stuff I do, find interesting, or want to blab about..

Using UDP in the Python Tornado Framework

A little while ago I was working on an API endpoint that needed to ask the BitTorrent Live video streaming trackers how many people were watching what swarms. I needed to do this by sending the byte 4 to the tracker on a certain ip and port. We were using Tornado. Previously to use UDP sockets with the Tornado event loop (in my python DHT project for example) I just created a non blocking UDP socket and added a handler for the READ state.

self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.io_loop.add_handler(self.sock.fileno(), self.handle_input, self.io_loop.READ)

udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpsock.setblocking(0)

The callback for this handler looked like this:

def handle_input(self, fd, events):
    (data, source_ip_port) = self.sock.recvfrom(4096)
    bdict = bdecode(data)

    #Got a response from some previous query
    if bdict["y"] == "r":
        self.handle_response(bdict, source_ip_port)

    #Porb gonna have to ad a listenr socket
    #Got a query for something
    if bdict["y"] == "q":
        self.handle_query(bdict, source_ip_port)

I believe this is an appropriate way to handle UDP sockets in Tornado (as the library only comes with TCP/HTTP based clients I know of..). However my friend Kyle Grahel put together a nice UDP Wrapper class that is much closer to and even takes methods from the general Tornado IOStream class.

Using the UDPWrapper I was able to do something like this (I actually added an enter and exit for the with however I am not sure if its actually very pythonic to do it that way versus try finally):

udpsockwrapper = UDPSockWrapper(udpsock, in_ioloop=io_loop)
response = None
with udpsockwrapper:
    udpsockwrapper.sendto(chr(4), (tracker_ip, int(tracker_port)))
    response = yield gen.Task(udpsockwrapper.read_chunk)

You may notice the yield gen.Task above? This is using Tornado’s awesome gen library. It basically allows you to turn your functions into generators which the event loop basically iterates through as it hits your callbacks. This allows you to take your nested callback code and turn it into a synchronous style. I believe this is similar to the Deferred class you yield with in the Twisted framework.

My modified version of the UDPWrapper:

import tornado, time

#From Kyle Grahel - http://kyle.graehl.org/
#The __enter__ and __exit__ are added by me.. probably not the best way to use
#these though..
class UDPSockWrapper(object):
    def __enter__(self):
        return

    def __exit__(self, type, value, traceback):
        self.close()

    def __init__(self, socket, in_ioloop=None):
        self.socket = socket
        self._state = None
        self._read_callback = None
        self.ioloop = in_ioloop or tornado.ioloop.IOLoop.instance()

    def __repr__(self):
        return "<UDPSockWrap:%s,rc:%s>" % (self.socket.fileno(), self._read_callback)

    def _add_io_state(self, state):
        if self._state is None:
            self._state = tornado.ioloop.IOLoop.ERROR | state
            #with stack_context.NullContext():
            self.ioloop.add_handler(
                self.socket.fileno(), self._handle_events, self._state)
        elif not self._state & state:
            self._state = self._state | state
            self.ioloop.update_handler(self.socket.fileno(), self._state)

    def sendto(self, msg, dest):
        return self.socket.sendto(msg, dest)

    def recv(self,sz):
        return self.socket.recv(sz)

    def close(self):
        self.ioloop.remove_handler(self.socket.fileno())
        self.socket.close()
        self.socket = None

    def read_chunk(self, callback=None, timeout=4):
        self._read_callback = callback
        self._read_timeout = self.ioloop.add_timeout( time.time() + timeout, 
            self.check_read_callback )
        self._add_io_state(self.ioloop.READ)

    def check_read_callback(self):
        if self._read_callback:
            # XXX close socket?
            #data = self.socket.recv(4096)
            self._read_callback(None, error='timeout')

    def _handle_read(self):
        if self._read_timeout:
            self.ioloop.remove_timeout(self._read_timeout)
        if self._read_callback:
            try:
                data = self.socket.recv(4096)
            except:
                # conn refused??
                data = None
            self._read_callback(data);
            self._read_callback = None

    def _handle_events(self, fd, events):
        if events & self.ioloop.READ:
            self._handle_read()
        if events & self.ioloop.ERROR:
            logging.error('%s event error' % self)

Another side note is that read_chunk above has the keyword argument callback. This is a requirement for the gen.Task class. The function that it executes should have a keyword argument callback=None. In order to convert any function to a function with this callback kwarg I used this lambda:

lambda **kwargs: db.get_item('users', {"HashKeyElement": {"S": username}}, kwargs['callback'])

You can then use them in gen.Task:

yield gen.Task(
            lambda **kwargs: db.get_item('users', {"HashKeyElement": {"S": username}}, kwargs['callback']))