Using Threadpools in Python
Like most programmers, I take a secret pleasure from using threads in my projects. Oh, I know minor oversights can introduce deadlocks which will wreck hard-to-diagnose havoc, but let's face it, threads are the gateway to delirious daydreams about how effectively your new script utilizes multiple cores... or would utilize them, if anyone was actually using it.
Fortunately for those of us seeking that misguided high, Python
makes putting together a sane thread pool quick and easy.
All you need is the standard library modules Queue
and threading.1. Queue.Queue
is a thread-safe
queue, and threading.Thread
provides a simple interface
for creating threads.
Let's look at an example of safely using threads in Python.
Processing a CSV File with Threadpool
This script will walk through a comma-separated-values file and use a thread pool to process the received data.
from __future__ import with_statement
from time import sleep
from csv import DictReader
from Queue import Queue
from threading import Thread
q = Queue()
workers = []
def worker():
while True:
line = q.get()
print "processing: %s\n\n" % line
q.task_done()
for i in range(10):
t = Thread(target=worker)
t.setDaemon(True)
workers.append(t)
t.start()
with open('myfile.csv','r') as fin:
for line in DictReader(fin):
q.put(line)
q.join()
A few notes:
This is Python2.5, you'd want to drop the
from __future__
import for Python2.6. For 3k you'd switch thefrom Queue import Queue
tofrom queue import Queue
, as well as adding parens toprint
. (Did I miss anything? Haven't actually used Py3k yet...)We're using
csv.DictReader
instead ofcsv.reader
because it returns a dictionary of key=>value mappings (treating the very first row as the keys), as opposed tocsv.reader
which returns an ordered tuple.You don't have to keep the threads in a list (embodied by
workers
above), but it's an easy way to keep references to them.I use
range
here, as it is the last-function-standing in Py3k, but certainly it would be more appropriate to usexrange
for Py2.x code.
We can make the above script a bit more robust with ample usage of the logging and optparse modules.
from __future__ import with_statement
import logging, sys
from optparse import OptionParser
from time import sleep
from csv import DictReader
from Queue import Queue
from threading import Thread
def manage_csv_file(file, logger, threads=10):
q = Queue()
workers = []
def worker():
while True:
line = q.get()
try:
print "processing: %s\n" % line
except Exception, e:
logger.error("worker-thread: %s" % e)
q.task_done()
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">threads</span><span class="p">):</span>
<span class="n">t</span> <span class="o">=</span> <span class="n">Thread</span><span class="p">(</span><span class="n">target</span><span class="o">=</span><span class="n">worker</span><span class="p">)</span>
<span class="n">t</span><span class="o">.</span><span class="n">setDaemon</span><span class="p">(</span><span class="bp">True</span><span class="p">)</span>
<span class="n">workers</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">t</span><span class="p">)</span>
<span class="n">t</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="nb">file</span><span class="p">,</span><span class="s">'r'</span><span class="p">)</span> <span class="k">as</span> <span class="n">fin</span><span class="p">:</span>
<span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">DictReader</span><span class="p">(</span><span class="n">fin</span><span class="p">):</span>
<span class="n">q</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">line</span><span class="p">)</span>
<span class="n">q</span><span class="o">.</span><span class="n">join</span><span class="p">()</span>
def main():
p = OptionParser("usage: pool.py somefile.csv")
p.add_option('-t', '–threads', dest='threads',
help='quantity of THREADS for processing',
metavar='THREADS')
(options, args) = p.parse_args()
filename = args[0]
threads = int(options.threads) if options.threads else 10
<span class="n">loggerLevel</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">DEBUG</span>
<span class="n">logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s">"FileManager"</span><span class="p">)</span>
<span class="n">logger</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">loggerLevel</span><span class="p">)</span>
<span class="n">ch</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">StreamHandler</span><span class="p">()</span>
<span class="n">ch</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">loggerLevel</span><span class="p">)</span>
<span class="n">formatter</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">Formatter</span><span class="p">(</span>
<span class="s">"</span><span class="si">%(asctime)s</span><span class="s"> - </span><span class="si">%(name)s</span><span class="s"> - </span><span class="si">%(levelname)s</span><span class="s"> - </span><span class="si">%(message)s</span><span class="s">"</span><span class="p">)</span>
<span class="n">ch</span><span class="o">.</span><span class="n">setFormatter</span><span class="p">(</span><span class="n">formatter</span><span class="p">)</span>
<span class="n">logger</span><span class="o">.</span><span class="n">addHandler</span><span class="p">(</span><span class="n">ch</span><span class="p">)</span>
<span class="k">if</span> <span class="n">filename</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s">'csv'</span><span class="p">):</span>
<span class="n">manage_csv_file</span><span class="p">(</span><span class="n">filename</span><span class="p">,</span> <span class="n">logger</span><span class="p">,</span> <span class="n">threads</span><span class="o">=</span><span class="n">threads</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">print</span> <span class="s">u"Cannot handle filename </span><span class="si">%s</span><span class="s">"</span> <span class="o">%</span> <span class="n">filename</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">'.'</span><span class="p">)[</span><span class="mf">0</span><span class="p">]</span>
if name == 'main':
sys.exit(main())
From there you can tweak things a bit to meet your needs to do some real work after parsing the data. Or you could extend the script to handle another format...
Adding Support for XML Processing
You might call the rest of this article contrived,
but I'm brewing with freshly minted knowledge of ElementTree,
so let's throw together a manage_xml_file
function.
Let's say that we're getting an XML document that is in this format...
<list>
<item/>
<item/>
<item/>
</list>
...and that we want to process each item
. But let's throw
in a wrench. Let's say that we know we want to process the root's
children, but that we don't even know the name of the root, or the
name of the children. And, damn it, we want to do it fast and in
a single pass. Let's get really crazy: let's say that we want to
extract as many well-formed elements as possible even if the XML
document eventually turns out to be malformed.2
First we need to add an import for ElementTree. For Python 2.x
it's worth your time to try importing cElementTree
if it's
available.3
try:
from xml.etree import cElementTree as ElementTree
except ImportError:
from xml.etree import ElementTree
In Python 3.0
the distinction between C and Python modules is largely eliminated,
so this try...except
dance is unnecessary, and you can get away
with just one import.
from xml.etree import ElementTree
Now let's get our hand's dirty.
def manage_xml_file(file, logger, threads=10):
def serialize(e):
'Recursively serialize an ElementTree.Element'
tag = e.tag
text = e.text
attributes = e.items()
kids = e.getchildren()
if text and not (attributes or kids):
return text
else:
d = {}
if text:
d['_text'] = text
for key, val in attributes:
d[key] = val
for kid in kids:
tag = kid.tag
serialized = serialize(kid)
if d.has_key(tag):
val = d[tag]
if type(val) == type([]):
val.append(serialized)
else:
d[tag] = [val, serialized]
else:
d[tag] = serialized
return d
<span class="n">q</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">()</span>
<span class="n">workers</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">worker</span><span class="p">():</span>
<span class="k">while</span> <span class="bp">True</span><span class="p">:</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">q</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">print</span> <span class="s">"processing: </span><span class="si">%s</span><span class="se">\n</span><span class="s">"</span> <span class="o">%</span> <span class="n">data</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">,</span> <span class="n">e</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s">"worker-thread: </span><span class="si">%s</span><span class="s">"</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span>
<span class="n">q</span><span class="o">.</span><span class="n">task_done</span><span class="p">()</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">threads</span><span class="p">):</span>
<span class="n">t</span> <span class="o">=</span> <span class="n">Thread</span><span class="p">(</span><span class="n">target</span><span class="o">=</span><span class="n">worker</span><span class="p">)</span>
<span class="n">t</span><span class="o">.</span><span class="n">setDaemon</span><span class="p">(</span><span class="bp">True</span><span class="p">)</span>
<span class="n">workers</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">t</span><span class="p">)</span>
<span class="n">t</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="n">item_tag</span> <span class="o">=</span> <span class="bp">None</span>
<span class="n">reader</span> <span class="o">=</span> <span class="nb">iter</span><span class="p">(</span><span class="n">ElementTree</span><span class="o">.</span><span class="n">iterparse</span><span class="p">(</span><span class="nb">file</span><span class="p">,</span> <span class="n">events</span><span class="o">=</span><span class="p">(</span><span class="s">'start'</span><span class="p">,</span> <span class="s">'end'</span><span class="p">)))</span>
<span class="n">reader</span><span class="o">.</span><span class="n">next</span><span class="p">()</span> <span class="c"># discard '<List>' type tag</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">for</span> <span class="n">event</span><span class="p">,</span> <span class="n">elem</span> <span class="ow">in</span> <span class="n">reader</span><span class="p">:</span>
<span class="k">if</span> <span class="n">item_tag</span> <span class="ow">is</span> <span class="bp">None</span><span class="p">:</span>
<span class="n">item_tag</span> <span class="o">=</span> <span class="n">elem</span><span class="o">.</span><span class="n">tag</span>
<span class="k">if</span> <span class="n">event</span><span class="o">==</span><span class="s">'end'</span> <span class="ow">and</span> <span class="n">elem</span><span class="o">.</span><span class="n">tag</span> <span class="o">==</span> <span class="n">item_tag</span><span class="p">:</span>
<span class="n">q</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">serialize</span><span class="p">(</span><span class="n">elem</span><span class="p">))</span>
<span class="n">elem</span><span class="o">.</span><span class="n">clear</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">SyntaxError</span><span class="p">,</span> <span class="n">e</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">critical</span><span class="p">(</span><span class="s">"encountered invalid xml, </span><span class="si">%s</span><span class="s">"</span> <span class="o">%</span> <span class="n">e</span><span class="p">)</span>
<span class="n">q</span><span class="o">.</span><span class="n">join</span><span class="p">()</span>
A few notes on this script as well:
I find it much more pleasant to serialize the elements into a native Python dictionary and then process it.
Yes, the serialize function is a bit verbose. And yes, on a sufficiently nested XML element it would run into Python's limitations for recursion. I'm okay with that.
It turns out that serializing XML to any combination of dictionaries and arrays is a bit of a black art, but I think that
serialize
gets it pretty much right.iterparse
is kind enough to tell us what line and character it fails on, which is more generous than many XML libraries, so we get to extract as much data as possible and know where the error is. You're (unfortunately) going to be hard pressed to get a whole lot more out of XML.iterparse
is also the fastest XML processor for Python, which is a nice side-effect of it's other nice aspects.
Then we'd need to update main
to check if files end with
xml
and then apply manage_xml_file
.
if filename.endswith('csv'):
manage_csv_file(filename, logger, threads=threads)
elif filename.endswith('xml'):
manage_xml_file(filename, logger, threads=threads)
else:
print u"Cannot handle filename %s" % filename.split('.')[0]
You can see the updated version of this script on GitHub.
I hope this example serves as an introduction to the wonderful world of Python threadpools. There is a lot of flexibility and efficiency to be squeezed out of your scripts and projects by incorporating a few threads here and there. Sure, you'll get burned eventually, but it means you're a pro...
Kind of.
Let me know if you have any comments or complaints!
Note that
Queue
has been renamed toqueue
in Python 3.0. Also note that Python 2.6 comes with threadsafe implementations of Lifo, Fifo and Priority queues. Lucky bastards.↩You're sitting there thinking "This is useless. No one will ever do this." And I raise you this past weekend where I did exactly this...↩
I learned this lesson trying to parse the 50 meg JMDict with the pure Python solution. It took a very long time. Then I switched the import and it took maybe thirty seconds. Son of a bitch.↩