Irrational Exuberance!

Using Threadpools in Python

February 10, 2009. Filed under python

Like most programmers, I take a secret pleasure from using threads in my projects. Oh, I know minor oversights can introduce deadlocks which will wreck hard-to-diagnose havoc, but let's face it, threads are the gateway to delirious daydreams about how effectively your new script utilizes multiple cores... or would utilize them, if anyone was actually using it.

Fortunately for those of us seeking that misguided high, Python makes putting together a sane thread pool quick and easy. All you need is the standard library modules Queue and threading.1. Queue.Queue is a thread-safe queue, and threading.Thread provides a simple interface for creating threads.

Let's look at an example of safely using threads in Python.

Processing a CSV File with Threadpool

This script will walk through a comma-separated-values file and use a thread pool to process the received data.

from __future__ import with_statement
from time import sleep
from csv import DictReader
from Queue import Queue
from threading import Thread

q = Queue()
workers = []

def worker():
    while True:
        line = q.get()
        print "processing: %s\n\n" % line
        q.task_done()

for i in range(10):
    t = Thread(target=worker)
    t.setDaemon(True)
    workers.append(t)
    t.start()

with open('myfile.csv','r') as fin:
    for line in DictReader(fin):
        q.put(line)

q.join()

A few notes:

  • This is Python2.5, you'd want to drop the from __future__ import for Python2.6. For 3k you'd switch the from Queue import Queue to from queue import Queue, as well as adding parens to print. (Did I miss anything? Haven't actually used Py3k yet...)

  • We're using csv.DictReader instead of csv.reader because it returns a dictionary of key=>value mappings (treating the very first row as the keys), as opposed to csv.reader which returns an ordered tuple.

  • You don't have to keep the threads in a list (embodied by workers above), but it's an easy way to keep references to them.

  • I use range here, as it is the last-function-standing in Py3k, but certainly it would be more appropriate to use xrange for Py2.x code.

We can make the above script a bit more robust with ample usage of the logging and optparse modules.

from __future__ import with_statement
import logging, sys
from optparse import OptionParser
from time import sleep
from csv import DictReader
from Queue import Queue
from threading import Thread

def manage_csv_file(file, logger, threads=10):
    q = Queue()
    workers = []
    def worker():
        while True:
            line = q.get()
            try:
                print "processing: %s\n" % line
            except Exception, e:
                logger.error("worker-thread: %s" % e)
            q.task_done()

    for i in range(threads):
        t = Thread(target=worker)
        t.setDaemon(True)
        workers.append(t)
        t.start()

    with open(file,'r') as fin:
        for line in DictReader(fin):
            q.put(line)

    q.join()

def main():
    p = OptionParser("usage: pool.py somefile.csv")
    p.add_option('-t', '--threads', dest='threads',
                help='quantity of THREADS for processing',
                metavar='THREADS')
    (options, args) = p.parse_args()
    filename = args[0]
    threads = int(options.threads) if options.threads else 10

    loggerLevel = logging.DEBUG
    logger = logging.getLogger("FileManager")
    logger.setLevel(loggerLevel)
    ch = logging.StreamHandler()
    ch.setLevel(loggerLevel)
    formatter = logging.Formatter(
           "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)

    if filename.endswith('csv'):
        manage_csv_file(filename, logger, threads=threads)
    else:
        print u"Cannot handle filename %s" % filename.split('.')[0]

if __name__ == '__main__':
    sys.exit(main())

From there you can tweak things a bit to meet your needs to do some real work after parsing the data. Or you could extend the script to handle another format...

Adding Support for XML Processing

You might call the rest of this article contrived, but I'm brewing with freshly minted knowledge of ElementTree, so let's throw together a manage_xml_file function.

Let's say that we're getting an XML document that is in this format...

<list>
    <item/>
    <item/>
    <item/>
</list>

...and that we want to process each item. But let's throw in a wrench. Let's say that we know we want to process the root's children, but that we don't even know the name of the root, or the name of the children. And, damn it, we want to do it fast and in a single pass. Let's get really crazy: let's say that we want to extract as many well-formed elements as possible even if the XML document eventually turns out to be malformed.2

First we need to add an import for ElementTree. For Python 2.x it's worth your time to try importing cElementTree if it's available.3

try:
    from xml.etree import cElementTree as ElementTree
except ImportError:
    from xml.etree import ElementTree

In Python 3.0 the distinction between C and Python modules is largely eliminated, so this try...except dance is unnecessary, and you can get away with just one import.

from xml.etree import ElementTree

Now let's get our hand's dirty.

def manage_xml_file(file, logger, threads=10):
    def serialize(e):
        'Recursively serialize an ElementTree.Element'
        tag = e.tag
        text = e.text
        attributes = e.items()
        kids = e.getchildren()
        if text and not (attributes or kids):
            return text
        else:
            d = {}
            if text:
                d['_text'] = text
            for key, val in attributes:
                d[key] = val
            for kid in kids:
                tag = kid.tag
                serialized = serialize(kid)
                if d.has_key(tag):
                    val = d[tag]
                    if type(val) == type([]):
                        val.append(serialized)
                    else:
                        d[tag] = [val, serialized]
                else:
                    d[tag] = serialized
        return d        

    q = Queue()
    workers = []

    def worker():
        while True:
            data = q.get()
            try:
                print "processing: %s\n" % data
            except Exception, e:
                logger.error("worker-thread: %s" % e)
            q.task_done()

    for i in range(threads):
        t = Thread(target=worker)
        t.setDaemon(True)
        workers.append(t)
        t.start()

    item_tag = None
    reader = iter(ElementTree.iterparse(file, events=('start', 'end')))
    reader.next() # discard '<List>' type tag
    try:
        for event, elem in reader:
            if item_tag is None:
                item_tag = elem.tag
            if event=='end' and elem.tag == item_tag:
                q.put(serialize(elem))
        elem.clear()
    except SyntaxError, e:
        logger.critical("encountered invalid xml, %s" % e)

    q.join()

A few notes on this script as well:

  • I find it much more pleasant to serialize the elements into a native Python dictionary and then process it.

    Yes, the serialize function is a bit verbose. And yes, on a sufficiently nested XML element it would run into Python's limitations for recursion. I'm okay with that.

    It turns out that serializing XML to any combination of dictionaries and arrays is a bit of a black art, but I think that serialize gets it pretty much right.

  • iterparse is kind enough to tell us what line and character it fails on, which is more generous than many XML libraries, so we get to extract as much data as possible and know where the error is. You're (unfortunately) going to be hard pressed to get a whole lot more out of XML.

    iterparse is also the fastest XML processor for Python, which is a nice side-effect of it's other nice aspects.

Then we'd need to update main to check if files end with xml and then apply manage_xml_file.

if filename.endswith('csv'):
    manage_csv_file(filename, logger, threads=threads)
elif filename.endswith('xml'):
    manage_xml_file(filename, logger, threads=threads)
else:
    print u"Cannot handle filename %s" % filename.split('.')[0]

You can see the updated version of this script on GitHub.

I hope this example serves as an introduction to the wonderful world of Python threadpools. There is a lot of flexibility and efficiency to be squeezed out of your scripts and projects by incorporating a few threads here and there. Sure, you'll get burned eventually, but it means you're a pro...

Kind of.

Let me know if you have any comments or complaints!


  1. Note that Queue has been renamed to queue in Python 3.0. Also note that Python 2.6 comes with threadsafe implementations of Lifo, Fifo and Priority queues. Lucky bastards.

  2. You're sitting there thinking "This is useless. No one will ever do this." And I raise you this past weekend where I did exactly this...

  3. I learned this lesson trying to parse the 50 meg JMDict with the pure Python solution. It took a very long time. Then I switched the import and it took maybe thirty seconds. Son of a bitch.