Python Threading with External Processes

Introduction

Often when you write python scripts you might realize that the steps in one of your loops are actually not dependent on each other, so you could parallelize them. In it’s simplest form, to parallelize computations python offers ready made threads and locks. There are two distinct ways to parallelize computation in python, either through Multiprocessing (and the according multiprocessing package) or Threading (and the according threading package), the pros and cons are excellently summarized by Jeremy Brown on stackoverflow. This post only covers a toy example for the use of the threading package.

Threading package

The threading package offers the usage of the threads of the underlying OS to be able to execute some job while already starting other jobs. Note that the package doesn’t circumvent the so called Global Interpreter Lock that prevents to have python threads running truly parallel on the same interpreter, i.e. if you run your python script the interpreter will always lock all the resources. That’s why in this post we will only focus on running an external process in parallel as for example a cmake command.

Parallel CMake Generator

The following example is a script that checks your current directories for CMake projects by checking if their is a CMakeLists.txt, it then creates a folder on a build folder and executes the corresponding cmake command. It does that while trying to parallelize over all the cores that the operating system provides. The code is structured in a way that every cmake to invoke is handled by a Job that is inserted in a Queue that get’s done by Workers

#!/usr/bin/python

import os
import Queue
import commands
from multiprocessing import cpu_count
from threading import Thread, Lock

class Worker(Thread):
    def __init__(self, queue):
        self.queue = queue
        super(Worker, self).__init__()

    def run(self):
        # Race condition, just try!
        while True:
            try:
                job = self.queue.get_nowait()
                job.execute()
                self.queue.task_done()
            except Queue.Empty:
                return

class Job(object):
    def __init__(self, app_name, stdout_mutex):
        self.app_name = app_name
        self.stdout_mutex = stdout_mutex

    def execute(self):
        # Actual Work
        cmake_cmd = commands.getoutput('cmake -Bbuild/{} -H{} -DCMAKE_BUILD_TYPE=Debug' \
            .format(self.app_name, self.app_name))

        with self.stdout_mutex:
            # Output printing
            print '----- GENERATED CMAKE for {}-----'.format(self.app_name)

            # Check if cmake was succesful or not
            if 'CMake Error' in cmake_cmd:
                print '==> Failed'
            else:
                print '==> Success'

if __name__ == "__main__":
    # Parse desired processors
    cpu_used = cpu_count()

    # Define stdout_mutex and fill up queue of jobs
    stdout_mutex = Lock()
    cmake_queue = Queue.Queue()

    for app_name in filter(os.path.isdir, os.listdir(os.getcwd())):
        if os.path.exists(os.path.join(app_name, 'CMakeLists.txt')):
            # Create directories and fill job queues
            os.makedirs('build/' + app_name)
            cmake_queue.put(Job(app_name, stdout_mutex))

    # Work on cmake
    for _ in range(cpu_used):
        Worker(cmake_queue).start()
    cmake_queue.join()

    print '\n==> Finished generating Makefiles <==\n'

Explanation

Worker

The worker gets the job done, he is simply designed as a subclass of the thread object that provides the threading activity in the start method, that itself will use the run method which is the method you want to override. We get the common job queue in the constructor and in the run method we acquire the job, execute it and tell it that the job is done. Note that there is a potential race condition if you want to check if the queue is empty that is while you are checking if the queue is empty, another worker might already take the last job in the queue, so the best way is to simply pass the appropriate exception.

Job

The job object is simply to package the code to execute as a job in a queue. In the execution, we use the commands package to invoke the cmake command on the specific application and afterwards acquire a mutex for uninterrupted printing of the result.

Main code

The code is pretty much self explanatory with the introduction, there are just two details: For one, the os.makedirs will fail if the directory already exist, which can easily be circumvented by these proposed methods. The second detail is that we use the queues join method to determine if all the jobs have been done, this method is blocking until the target is achieved.