Multi (Parallel)-Processing with Python

True Story Follows

Recently I was working on a problem in which I needed to process 2 Terabytes of data. This doesn’t sound like all that much to me unless I refer to it as 2,000 gigabytes, or about 1.7 million copies of “Moby Dick”. That’s enough copies of Moby Dick for every man, woman, and child in the city of Philadelphia, PA. And it’s also 375 times more copies than were printed during the author’s lifetime.

terabytes

I was processing the data on a large EC2 instance which had 32 processors.

processor_screenshot

The Problem

In order to process all the data in a reasonable amount of time, I would need to leverage all of the processors on the machine. Up until now in practical terms, this was the first time where it would be an absolute necessity to parallelize. And sure, you’re parallelizing with asynchronous background tasks in web applications, but that’s generally for scaling as your user base grows, not for an individual task that will yield some finite output.

Furthermore, I could go install some task broker and messaging system like celery and RabbitMQ, but it should be within my immediate grasp to leverage all of the machine’s processors.

At this point it also made me wish I’d paid attention better and grasped some concepts with functional programming better back in Dr. Chris “Purely Functional Badass” O’kasaki’s class. Functional programming has no state and therefore no side effects and therefore no dependencies and therefore parallelizing with multiple processors is built into functional languages.

But alas, I’m using Python. And I’m not going to learn Haskell just so I can read Moby Dick 1.7 million times.

The Code

The problem I was solving had a specific use case, so the code correlated to that case. I broke out the generic parts of it to demonstrate here. So for a simple example, let’s just say I want to execute this really simple python script multiple times in parallel across multiple processors:

import time
import random

wait_time = random.random() * 10
print "Waiting for %s seconds" % wait_time
time.sleep(wait_time)

In practical terms (in hindsight), similar parallelization tasks that would be worth investing in would be for things like video processing that take minutes or hours to complete.

Anyway, I have two examples. One in which I want to create a pipeline where I parallelize tasks and then do something else once they all finish. Or, perhaps I’m just mapping a whole bunch of data and need to continuously launch more and more processes.

In either case, both take advantage of the following function definitions, so you can just read these and then accept them as fact:

from subprocess import Popen


def _launch_new_process(args, argument_generator):
    process_args = args + [next(argument_generator)]
    print "Launching %s" % (" ".join(process_args))
    child_process = Popen(process_args)
    return child_process


def example_arg_generator():
    ''' Sample generator function '''
    for some_value in range(1, 1000):
        yield str(some_value)

Continuous processing

In the real world case, I needed to process about 900 files. So rather than block until 32 processors each finished a respective task, I wanted to continue throwing available processors at reading through files. So this code is for that purpose:

def launch_fixed_processes(process_count, base_command, argument_generator):
    active_child_processes = []
    args = base_command.split()
    while True:
        active_child_processes = _filter_processes_still_active(active_child_processes)
        if len(active_child_processes) < process_count:
            try:
                child_process = _launch_new_process(args, argument_generator)
            except StopIteration:
                break
            active_child_processes.append(child_process)


def _filter_processes_still_active(processes):
    return [process for process in processes if process.poll() is None]


launch_fixed_processes(50, "python child_process.py", example_arg_generator())

continuous

Pipeline

In the second step of the process, basically something akin to a fold or reduce, I’m taking the data that was previously mapped and merging the results. In this case, I wanted to basically take hundreds of output files, have each processor load two distinct files and merge the results, then continue the process. Mind you, this was still dozens of gigabytes of data.

So in this case, I need a pipeline because the data is changing as I’m working with it. Once two files are merged, there’s a new file.

So in this code sample, I can just create a list of tasks and wait until they complete, then repeat with the new data:

def join_multiple_processes(process_count, base_command, argument_generator):
    processes = []
    args = base_command.split()
    for _ in xrange(process_count):
        try:
            child_process = _launch_new_process(args, argument_generator)
        except StopIteration:
            break
        processes.append(child_process)
    _block_until_processes_complete(processes)


def _block_until_processes_complete(processes):
    exit_codes = [p.wait() for p in processes]
    return exit_codes


join_multiple_processes(50, "python child_process.py", example_arg_generator())


blocking

In both examples, all that’s happening is that python is launching a new process with certain arguments. In this case, the arguments don’t do anything, but it’s how you could specify instructions to your subprocesses. I chose to implement that with a generator argument (which in my example just counts from 1 to 1000).

The End

In conclusion, I never actually read Moby Dick. Not once.