The Basics of Python Multithreading and Queues

I’ve never been a fan of programmer-speak. It sometimes feels like people make code, processes and even documentation opaque on purpose.

Multithreading in Python, for example. Or how to use Queues.

So here’s something for myself next time I need a refresher. It’s the bare-bones concepts of Queuing and Threading in Python.

Let’s start with Queuing in Python.

Before you do anything else, import Queue.

from Queue import Queue

A queue is kind of like a list:

my_list = []
my_list.append(1)
my_list.append(2)
my_list.append(3)
print my_list.pop(0)
# Outputs: 1

The above code creates a list, assigns it three values, then removes the first value in so the list now has only 2 values (which are 2 and 3).

my_queue = Queue(maxsize=0)
my_queue.put(1)
my_queue.put(2)
my_queue.put(3)
print my_queue.get()
my_queue.task_done()
# Outputs: 1

There are only a couple differences in how queues work visually. First we set a maximum size to the queue, where 0 means infinite. It’s pretty dumb but I’m sure it’s useful somehow.

The second visual difference is the task_done() bit at the end. That tells the queue that not only have I retrieved the information from the list, but I’ve finished with it. If I don’t call task_done() then I run into trouble in threading. So let’s just say in Queues, you have to call this.

The big important point about Queues is that they work really well with threading. In fact, you just can’t use lists the way you can use queues in threading. That’s why I’m even bothering to bring them up here.

Here’s an example of a simple program that uses Queues:

from Queue import Queue

def do_stuff(q):
  while not q.empty():
    print q.get()
    q.task_done()

q = Queue(maxsize=0)

for x in range(20):
  q.put(x)

do_stuff(q)

It outputs 0-19. In like the most complicated way possible to output 0-19.

Notice how do_stuff() is just whipping through the whole queue. That’s nice. But what if it was trying to do a big task, or a task that required a lot of waiting (like pulling data from APIs)? Assume for example that do_stuff() takes 30 second to run each time and it’s just waiting on stupid APIs to return something. The function would take 30 seconds every time it ran, and it would run 20 times so it would take 10 minutes to get through just 20 items. That’s really shitty.

Enter Python Threading.

Start with importing the right stuff:

from Queue import Queue
from threading import Thread

Threads are probably really complex. Or so I’m lead to believe. All you need to know for now, though, is that they use a worker function to get stuff done, they run at the same time, and you can pull them all together when they’re done. So first you need to set up a worker function:

def do_stuff(q):
  while True:
    print q.get()
    q.task_done()

We’re more or less just stealing the function from the last bit except we’re setting it up for an infinite loop (while True). It just means that I want my threads always ready to accept new tasks.

Now I want to create the actual threads and set them running. Before I do that, though, I need to give them a Queue to work with. The Queue doesn’t have to have anything on it, it just needs to be defined so that my threads know what they’ll be working on. Here’s how I set my (10) threads running:

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
  worker = Thread(target=do_stuff, args=(q,))
  worker.setDaemon(True)
  worker.start()

So you see the Queue set up (as “q”), then I define a loop to run the thread creation bits 10 times. The first line in the loop sets up a thread and points it first at the do_stuff function, and then passes it “q” which is the Queue we just defined. Then something about a daemon, and we start the bugger. That’s 10 threads running (remember the infinite loop in do_stuff()?) and waiting for me to put something in the Queue.

The rest of the code is the same as the Queue example so I’m just going to put it all together and let you figure it out:

from Queue import Queue
from threading import Thread

def do_stuff(q):
  while True:
    print q.get()
    q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
  worker = Thread(target=do_stuff, args=(q,))
  worker.setDaemon(True)
  worker.start()

for x in range(100):
  q.put(x)

q.join()

The only bit that should be new is the q.join() bit right at the end. This basically just waits until the queue is empty and all of the threads are done working (which it knows because task_done() will have been called on every element of the queue). If you were running a program in batches, you might use q.join() to wait for the batch to finish and then write the results to a file, and then just throw more tasks into the queue.

Consider revising the last 3 lines into a loop:

for y in range (10):
  for x in range(100):
    q.put(x + y * 100)
  q.join()
  print "Batch " + str(y) + " Done"

It’s cool that Queues can get added to willy nilly and these Threads will just pick them up, and whenever I want to I can stop and join all of them together for a second so I can check in, maybe write to a file or database or just let the user know that I’m still working away.

Remember the example I gave before about each run of do_stuff() taking 30 seconds? And since I had to run it 20 times it’d take 10 minutes? Now I can just run 20 different threads and the whole program will be done in about 30 seconds rather than 10 minutes. Obviously your results may vary, but it’s definitely faster.

In any case, hope this helped. If you want the nitty gritty details, go read the documentation. This should get you started though.

52 thoughts on “The Basics of Python Multithreading and Queues”

  1. You have a fancy website here, but just as an aside, your css breaks on Chromium in Linux (Chromium 37, Ubuntu 13.10). It shows as white with black drop shadow making it impossible to read.

      1. years later, still hard to read. Windows Chrome, light gray text on white background. The disqus section has normal black text and is easier to read than the article :)

    1. use class and def to do that

      class xxx:
      def __init__(self):
      #main stuff
      self.values = []
      # set queue here
      def worker(self,q)
      q.get()
      # process queue
      # return value
      self.values.append(response)

      but better use thread lock before appending to list, then release it

    2. It’d probably be smart for me to write an update for that, since it’s the obvious next step. I’m always a bit ashamed of showing my Python code since I’m 100% sure everyone else does it better.

      Personally I have a class that I created which does all of my queueing/threading so I don’t have to think about it much. It stores all results and errors in queues. It takes a callback function as an argument, and that function needs to have the form my_callback(results_list, errors_list). So when the whole shebang finishes, I dump the result and error queues into lists and send them to the callback function.

      Here’s my dump_queue function, which I’m sure is ugly as hell:

      def dump_queue(self,queue):
      “””
      Empties all pending items in a queue and returns them in a list.
      “””
      result = []
      queue.put(“STOP”)
      for i in iter(queue.get, ‘STOP’):
      result.append(i)
      time.sleep(.1)
      return result

    1. I fixed it! If it matters, I was using a plugin called Prism WP which wasn’t being maintained. I switched to Crayon, which looks like it gets a lot of love. I didn’t even have to update the tags, just set the fallback language to Python and it works for most of my tech articles.

      Hope it’s easier to see now!

  2. Hey, finally a good example on what I needed! I don’t have enough fingers to count how many stackoverflow pages I read on this topic to no avail.

    Thanks!

    1. You’re welcome :) One of these days I’ll throw up some examples of how I use the technique. There’s still some stuff that’s not obvious, but it’s a start!

  3. Many thanks for this, it really helped a lot! If anyone ever uses this for multiple clients hitting a server and running concurrent MySQL queries and getting command out of sync, threading will work. Below may help someone. FGor each data received, the key is to make an instance of the database per thread.

    def query_handler(q, conn, mysockets, clients, PORTS):
    db=db_class()
    while True:
    data=q.get()

    #do stuff

    q.task_done()
    conn.send(json.dumps(reply))

    data=json.loads(conn.recv(pow(2,K)))

    if data:
    q = Queue(maxsize=0)

    worker = Thread(target=query_handler, args=(q,conn,mysockets, clients, PORTS))
    worker.start()
    worker.start()

    q.put(data)
    q.join()

    1. This is neat :) I’ll have to dig into it. What I’ve generally been doing is batching my DB writes, so instead of writing within the worker, I just wait until a batch is done and write from that batch. That way I don’t have to worry about workers stepping on each others’ toes. It’s super dependent on your application though, since I’m obviously creating a bottleneck with my method.

    1. Hey Alex! I’m a bit late, but what q.join() does is wait for all of the threads to call task_done() before continuing its work. So if you look at the last code example, the outside loop goes through 10 “batches,” where each batch gives some work to the workers, waits for them to do their job, and when they’re done (q.join()), goes into the next iteration of the loop.

  4. Hi, there is an error in the first block of code:
    “The above code creates a list, assigns it three values, then removes the first value in so the list now has only 2 values (which are 2 and 3).”

    ——–

    pop() actually returns/removes the last item, which in this case is `3`

    1. mylist.pop() does indeed do that, but in our case we’re using mylist.pop(0), which removes and returns the element in position 0 (the first we added, so 1).

      https://docs.python.org/2/tutorial/datastructures.html

      This is why I don’t usually show my code to people though ;) Pop has always had the connotation of removing the last item in the list, and I’m sure there’s a method to remove the first item in list (.popleft()?) and I’m just being weird.

  5. thanks for the article; i am new to python as a language but have written a few background jobs in it. i agree most python tuts on the net are horribly explained, it’s not that difficult if it’s explained properly. I just wanted to know if I needed this to run as a deamon (because my queue is being fed from SQL and needs to wait for items to be added to the table which varies depending on requests) which does not quit, simply does a routine check if there are any new items to queue and pops them on as they come in – how would I do this?

    1. Hey :) Technically the Daemon line in the sample code (we’re setting it to True) means, “when the main program (thread) ends, feel free to kill all daemons.” If we set our threads to worker.setDaemon(false), the program would wait for all of our threads to be explicitly closed before allowing the program to exit. See http://stackoverflow.com/questions/190010/daemon-threads-explanation and https://docs.python.org/2/library/threading.html under 16.2.1. Thread Objects.

      You could technically keep your program alive and feed it via SQL checks at intervals, but there’s no real reason to keep our worker threads alive between those checks IMHO. The overhead for starting/stopping them is minimal.

      If I were you what I’d do is use APScheduler (see https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py and http://stackoverflow.com/questions/22715086/scheduling-python-script-to-run-every-hour-accurately) to periodically check for new items in your database, and if they’re around just pop into your multithreaded script to execute. I’d keep Daemon=True, which will kill all of the threads once they’re done working; they just start up again when you come back with your check.

      Keep in mind that daemonic threads don’t end super gracefully, but they work well if you’re using them in the context of this article, e.g. we’re explicitly “joining” them and we know when we’re done with them. This is also why I join threads before writing any data anywhere, because if you write using the daemon thread you might accidentally (and hypothetically, I haven’t tried it) cut it off mid-action and not properly write the data or close the file if applicable.

  6. I’m bad at / hate troubleshooting technical issues, so can’t really help you with the APScheduler install :(

    For the methodology, I’m not suggesting using APScheduler to check the queue. APScheduler would run a script (Checker.py) that does something like this:

    1) Check the SQL database to see if anything is new.
    2) If so, run checker.py (or w/e)

    Runner.py does this:
    1) Check the SQL database again (just for code segmentation really) and pulls out the desired rows.
    2) Adds the data from those rows to the queue.
    3) Starts threads w/ assigned workers.
    4) Joins all threads when complete and updates SQL database.
    5) Stops running which implicitly kills all threads.

    Now what happens is that every X time, Checker.py runs. If Checker.py decides there’s work to do, it initiates Runner.py, which performs that work. Everything ends and no Python is running in the intermediate time. Then the process starts again.

    I’m actually writing an article to demonstrate this since it was an interesting question :) Until then, I hope this helps!

    1. hey Troy, no worries about the debug, the test box is running py 2.6.6 and was installed a while ago so I’m rather going to test on an ubuntu instance.

      Thanks so much for your help, looking forward to your article. FYI my queue contains a list of URL+postdata to be CURL’d

  7. Rohan Sachdeva

    Great post Troy, I have some doubts:-

    1. If you’ve used threadpoolexecutor, then this implementation of threads with queues was not required?
    2. As you have used “while True” in thread routine, how does it(thread) knows when to exit? Is q.join() making it to do

    Thanks in advance !

    1. Hey Rohan,

      1) Threadpoolexecutor works pretty much the same way as this script, meaning it still requires queues, workers and all that goodness. If I rewrote this entire script using it, we’d have about 3 lines fewer code (mostly the starting of workers). It seems neat though? I guess I’ll have to mess around with it some more, but it doesn’t seem like it abstracts enough of the process for me to learn its internals.

      2) The workers will keep running until the program ends, in this case. q.join is more or less, “wait until all running workers report task_done().” So q.join isn’t ending the threads, rather it’s waiting until they’re done working. I can then just throw more work into them as per the “batching” example I provided in the post. I’ll quote a previous answer since it was fairly complete :) See cpt_citizen’s question if you want some more content:

      Technically the Daemon line in the sample code (we’re setting it to True) means, “when the main program (thread) ends, feel free to kill all daemons.” If we set our threads to worker.setDaemon(false), the program would wait for all of our threads to be explicitly closed before allowing the program to exit. See http://stackoverflow.com/questions/190010/daemon-threads-explanation and https://docs.python.org/2/library/threading.html under 16.2.1. Thread Objects.

  8. For anyone coming in from Python 3
    from queue import PriorityQueue
    my_queue = PriorityQueue(maxsize=0)
    my_queue.put(1)
    my_queue.put(2)

    print (my_queue.get())
    my_queue.task_done()

  9. Thank you for a nice and easy step by step explanation of what each piece of code means, and building it up from scratch. extremely useful!

  10. Thanks for your post. It’s very helful.
    I have a question please : I would like to read a file in infinite loop to simulate a real-time streaming. So I thought about using a queue wich contains the data from this file in oder to process the flow with sliding windows.
    My question is : Is the idea feasible and how can I use the multithreading with infinite flow ?

    1. Hey IBK!

      I’m sure you can do something to that effect using loops, but you’d probably want to add a pause in there somewhere. If I was throwing a file read in there somewhere I’d do it around the q.join() part of the loop. If the file comes up empty, wait 5-15 seconds. If the file has items, add them to the queue and empty out the file.

      I’m not sure what the impact is on a processor to have this run empty forever. I really built this to run on scripts under 4 hours, once per project. So I’m not sure how it operates under stress and over time.

      1. Thanks for your replay.
        If I’ve undrestood you correctly, firstly I put the file read in the queue, after that I make a pause and i start reading the file again and so on… In this way I take into consideration the case or the empty file and I apply a simulation of an infinite flow.
        But I can’t understand how I’m going to apply it arround the q.join() and how I specify the number of threads to use.
        (Exuse me this is the first time I use these notions)

  11. a bit confused here. understood the concept but could you explain the reasoning behind adding to the queue after you already set the threads to work with start.

    1. Hey Geo :) Think about it this way. Once we set them running, the threads go through all of the tasks on their queues but then keep “running” empty. So when we add something to the queue, they’ll chew through them. We can keep adding to the queues and they’ll get chewed through by the worker threads. When they’re empty, they’ll just wait for new tasks. Hope that helps!

      1. Yea, helps a bit. I was thinking about that but I was wondering if that’s the case, then they basically run forever? How do they know when it’s time to terminate? or the idea was actually make them run forever?

        1. Yeah that’s not exactly apparent from the code. Take a look at my response to Rohan in the comments below on the line “setDaemon(true)” which should help you understand that.

          In short, when the program logically hits the end of the main() function all of the threads will die. This functionality would change if you changed setDaemon to false instead of true–I liked to the relevant docs in that comment.

  12. Hey Troy – Thanks for the tutorial. I am writing a web crawler script and this was very helpful for me. I had one question though. In this line:

    worker = Thread(target=do_stuff, args=(q,))

    Let’s say that “do_stuff” was a function that returned a value that I wanted to append to a list. How would I get the return value from it?

    1. Hey Jesse,

      This is what I found in the last threading program I wrote. It’s ugly as hell and I can’t even remember if it works as-is. But you can see that I pass in three queues: the source queue, the results queue, and an error queue. Hope this helps! And report back on if this thing actually works still ;)

      from queue import Queue
      from threading import Thread
      import time

      class StandardQueue:
      def __init__(self,worker_function,callback_function=None,num_threads=5,batch_size=25):
      self.worker_function = worker_function
      self.callback_function = callback_function
      self.num_threads = num_threads
      self.batch_size = batch_size
      def run(self,iterable_list):
      q = Queue(maxsize=0)
      results_queue = Queue(maxsize=0)
      error_queue = Queue(maxsize=0)

      # Set up the batches
      the_end = len(iterable_list)
      x = 0
      batch = 1
      max_batch = int (the_end / self.batch_size) + 1

      # Start threads running
      for i in range(self.num_threads):
      worker = Thread(target=self.worker_function,args=(q,results_queue,error_queue,))
      worker.setDaemon(True)
      worker.start()

      # Do all of this in batches
      for this_list_batch in self.make_batch(iterable_list,self.batch_size):
      # Output UI info
      print (“[Batch: “+str(batch)+” / “+str(max_batch)+” Started.]”)
      batch = batch+1

      for y in this_list_batch:
      q.put(y)

      # This waits until all threads finish
      q.join()
      if self.callback_function:
      these_results = self.dump_queue(results_queue)
      these_errors = self.dump_queue(error_queue)
      # Combine Batch Results
      self.callback_function(these_results,these_errors)
      if not self.callback_function:
      these_results = self.dump_queue(results_queue)
      these_errors = self.dump_queue(error_queue)
      return these_results,these_errors

      def make_batch(self,iterable, n = 1):
      l = len(iterable)
      for ndx in range(0, l, n):
      yield iterable[ndx:min(ndx+n, l)]

      def dump_queue(self,queue):
      “””
      Empties all pending items in a queue and returns them in a list.
      “””
      result = []
      queue.put(“STOP”)

      for i in iter(queue.get, ‘STOP’):
      result.append(i)
      time.sleep(.1)
      return result

      # def sq_worker_function(self,q,results_queue,error_queue):
      # “””
      # “””
      # while True:
      # x = q.get()
      # # Do something with it.
      # error_queue.put(stuff)
      # results_queue.put(stuff)
      # q.task_done()

      # def sq_callback_function(self,results,errors):
      # “””
      # “””
      # for x in results:
      # pass
      # for x in errors:
      # pass

  13. @troyfawkes:disqus this is indeed very clean writeup.
    there is a type while importing queues
    correct syntax is ‘from queue import Queue’

Leave a Comment

Your email address will not be published. Required fields are marked *