February 10, 2009.
Like most programmers, I take a secret pleasure from using threads in my projects. Oh, I know minor oversights can introduce deadlocks which will wreck hard-to-diagnose havoc, but let's face it, threads are the gateway to delirious daydreams about how effectively your new script utilizes multiple cores... or would utilize them, if anyone was actually using it.
Fortunately for those of us seeking that misguided high, Python
makes putting together a sane thread pool quick and easy.
All you need is the standard library modules Queue
and threading.1. Queue.Queue
is a thread-safe
queue, and threading.Thread
provides a simple interface
for creating threads.
Let's look at an example of safely using threads in Python.
This script will walk through a comma-separated-values file and use a thread pool to process the received data.
from __future__ import with_statement
from time import sleep
from csv import DictReader
from Queue import Queue
from threading import Thread
q = Queue()
workers = []
def worker():
while True:
line = q.get()
print "processing: %s\n\n" % line
q.task_done()
for i in range(10):
t = Thread(target=worker)
t.setDaemon(True)
workers.append(t)
t.start()
with open('myfile.csv','r') as fin:
for line in DictReader(fin):
q.put(line)
q.join()
A few notes:
This is Python2.5, you'd want to drop the from __future__
import for Python2.6. For 3k you'd switch the from Queue import Queue
to from queue import Queue
, as well as adding parens
to print
. (Did I miss anything? Haven't actually used
Py3k yet...)
We're using csv.DictReader
instead of csv.reader
because it returns a dictionary of key=>value mappings
(treating the very first row as the keys), as opposed
to csv.reader
which returns an ordered tuple.
You don't have to keep the threads in a list (embodied by
workers
above), but it's an easy way to keep references
to them.
I use range
here, as it is the last-function-standing
in Py3k, but certainly it would be more appropriate to use
xrange
for Py2.x code.
We can make the above script a bit more robust with ample usage of the logging and optparse modules.
from __future__ import with_statement
import logging, sys
from optparse import OptionParser
from time import sleep
from csv import DictReader
from Queue import Queue
from threading import Thread
def manage_csv_file(file, logger, threads=10):
q = Queue()
workers = []
def worker():
while True:
line = q.get()
try:
print "processing: %s\n" % line
except Exception, e:
logger.error("worker-thread: %s" % e)
q.task_done()
for i in range(threads):
t = Thread(target=worker)
t.setDaemon(True)
workers.append(t)
t.start()
with open(file,'r') as fin:
for line in DictReader(fin):
q.put(line)
q.join()
def main():
p = OptionParser("usage: pool.py somefile.csv")
p.add_option('-t', '--threads', dest='threads',
help='quantity of THREADS for processing',
metavar='THREADS')
(options, args) = p.parse_args()
filename = args[0]
threads = int(options.threads) if options.threads else 10
loggerLevel = logging.DEBUG
logger = logging.getLogger("FileManager")
logger.setLevel(loggerLevel)
ch = logging.StreamHandler()
ch.setLevel(loggerLevel)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
if filename.endswith('csv'):
manage_csv_file(filename, logger, threads=threads)
else:
print u"Cannot handle filename %s" % filename.split('.')[0]
if __name__ == '__main__':
sys.exit(main())
From there you can tweak things a bit to meet your needs to do some real work after parsing the data. Or you could extend the script to handle another format...
You might call the rest of this article contrived,
but I'm brewing with freshly minted knowledge of ElementTree,
so let's throw together a manage_xml_file
function.
Let's say that we're getting an XML document that is in this format...
<list>
<item/>
<item/>
<item/>
</list>
...and that we want to process each item
. But let's throw
in a wrench. Let's say that we know we want to process the root's
children, but that we don't even know the name of the root, or the
name of the children. And, damn it, we want to do it fast and in
a single pass. Let's get really crazy: let's say that we want to
extract as many well-formed elements as possible even if the XML
document eventually turns out to be malformed.2
First we need to add an import for ElementTree. For Python 2.x
it's worth your time to try importing cElementTree
if it's
available.3
try:
from xml.etree import cElementTree as ElementTree
except ImportError:
from xml.etree import ElementTree
In Python 3.0
the distinction between C and Python modules is largely eliminated,
so this try...except
dance is unnecessary, and you can get away
with just one import.
from xml.etree import ElementTree
Now let's get our hand's dirty.
def manage_xml_file(file, logger, threads=10):
def serialize(e):
'Recursively serialize an ElementTree.Element'
tag = e.tag
text = e.text
attributes = e.items()
kids = e.getchildren()
if text and not (attributes or kids):
return text
else:
d = {}
if text:
d['_text'] = text
for key, val in attributes:
d[key] = val
for kid in kids:
tag = kid.tag
serialized = serialize(kid)
if d.has_key(tag):
val = d[tag]
if type(val) == type([]):
val.append(serialized)
else:
d[tag] = [val, serialized]
else:
d[tag] = serialized
return d
q = Queue()
workers = []
def worker():
while True:
data = q.get()
try:
print "processing: %s\n" % data
except Exception, e:
logger.error("worker-thread: %s" % e)
q.task_done()
for i in range(threads):
t = Thread(target=worker)
t.setDaemon(True)
workers.append(t)
t.start()
item_tag = None
reader = iter(ElementTree.iterparse(file, events=('start', 'end')))
reader.next() # discard '<List>' type tag
try:
for event, elem in reader:
if item_tag is None:
item_tag = elem.tag
if event=='end' and elem.tag == item_tag:
q.put(serialize(elem))
elem.clear()
except SyntaxError, e:
logger.critical("encountered invalid xml, %s" % e)
q.join()
A few notes on this script as well:
I find it much more pleasant to serialize the elements into a native Python dictionary and then process it.
Yes, the serialize function is a bit verbose. And yes, on a sufficiently nested XML element it would run into Python's limitations for recursion. I'm okay with that.
It turns out that serializing XML to any combination
of dictionaries and arrays is a bit of a black art,
but I think that serialize
gets it pretty much
right.
iterparse
is kind enough to tell us what line
and character it fails on, which is more generous
than many XML libraries, so we get to extract as much
data as possible and know where the error is.
You're (unfortunately) going to be hard pressed to
get a whole lot more out of XML.
iterparse
is also the fastest XML processor for
Python, which is a nice side-effect of it's other
nice aspects.
Then we'd need to update main
to check if files end with
xml
and then apply manage_xml_file
.
if filename.endswith('csv'):
manage_csv_file(filename, logger, threads=threads)
elif filename.endswith('xml'):
manage_xml_file(filename, logger, threads=threads)
else:
print u"Cannot handle filename %s" % filename.split('.')[0]
You can see the updated version of this script on GitHub.
I hope this example serves as an introduction to the wonderful world of Python threadpools. There is a lot of flexibility and efficiency to be squeezed out of your scripts and projects by incorporating a few threads here and there. Sure, you'll get burned eventually, but it means you're a pro...
Kind of.
Let me know if you have any comments or complaints!
Note that Queue
has been renamed
to queue
in Python 3.0. Also note that
Python 2.6 comes with threadsafe implementations of
Lifo, Fifo and Priority queues. Lucky bastards.↩
You're sitting there thinking "This is useless. No one will ever do this." And I raise you this past weekend where I did exactly this...↩
I learned this lesson trying to parse the 50 meg JMDict with the pure Python solution. It took a very long time. Then I switched the import and it took maybe thirty seconds. Son of a bitch.↩