Concurrency-12-Multiprocessing

Last updated: December 27th, 20202020-12-27Project preview
In [1]:
import time
import queue
import random
import multiprocessing as mp
from multiprocessing import Process
In [2]:
# Don't worry about this for now...
mp.set_start_method('fork')

Multiprocessing

The second most common way to write concurrent programs is with the use of multiprocessing (I'm leaving asyncio aside). It's the oldest concurrency concept, as processes predate threads.

In a multiprocessing program, we'll create multiple processes that will be ran concurrently (and potentially in parallel) by the operating system. It's important to stress the fact that when we write multiprocessing code, we're giving full authority to the OS to manage and schedule our processes.

How can multiprocessing help with the GIL?

The main issue with the GIL was to protect shared data (low level data like reference counts) between threads. But, what if there's NO shared data at all? If you remember from our OS review before, data is shared only within the same process. Different processes DON'T share data. Which means that there's no GIL to worry about.

Then, why not to use multiprocessing all the time?

If multiprocessing doesn't suffer from the GIL, why not to use it instead of multithreading? As usual with computers, there's no free lunch. Multiprocessing suffers from 2 major drawbacks:

1. Slower, resource heavy

Creating new processes is a lot slower than spawning threads. And by spawning new processes, we're duplicating all the information of our processes: share data, file descriptors, etc.

2. Hard to orchestrate

As processes don't share data, it's hard to coordinate results and flags between multiple processes. We'll see this in an example later.

The Process API

The multiprocessing module has a Process class with an API very similar to the one in threading.Thread.

Warning: Always make sure you're using the module multiprocessing and not subprocess.

Let's see an example:

In [3]:
def say_hello():
    myself = mp.current_process()
    print(f'Hello World from "{myself.name}"')
In [4]:
p = Process(target=say_hello, name="My First Process")
In [5]:
p.start()
Hello World from "My First Process"
In [6]:
p.join()

It's important to free up resources allocated by such process:

In [7]:
p.close()

Find prime example

Let's verify if processes can actually run in parallel by running again our "check primes" example. If that's the case, we'll see a significant improvement in time. Remember, our multi-threaded version took ~4 seconds to process all 10 numbers.

In [8]:
def is_prime(n):
    if n in (2, 3):
        return True
    if n % 2 == 0:
        return False
    for divisor in range(3, n, 2):
        if n % divisor == 0:
            return False
    return True
In [9]:
with open('prime_mixture.txt') as fp:
    numbers = [int(n.strip()) for n in fp.read().split() if n]
In [10]:
numbers[:5]
Out[10]:
[15492781, 15492787, 15492803, 15492811, 15492810]
In [11]:
def check_prime_worker(number):
    if is_prime(number):
        print(f'{number} IS PRIME ✅', flush=True)
    else:
        print(f'{number} IS NOT PRIME ❌', flush=True)
In [12]:
processes = [Process(target=check_prime_worker, args=(number,)) for number in numbers]
In [13]:
start = time.time()
In [14]:
[p.start() for p in processes];
15492810 IS NOT PRIME ❌
15492811 IS PRIME ✅
15492803 IS PRIME ✅
15492833 IS PRIME ✅
15492781 IS PRIME ✅
15520301 IS PRIME ✅
15492787 IS PRIME ✅
15492859 IS PRIME ✅
15502547 IS PRIME ✅
15527509 IS PRIME ✅
In [15]:
[p.join() for p in processes];
In [16]:
time.time() - start
Out[16]:
2.485031843185425
In [17]:
[p.close() for p in processes];

We can see a clear running time improvement, from ~4 seconds to ~0.7, which means that processes are indeed running in parallel.

Sharing data with processes

It's not as simple as with threads to share data. In our multithreaded example, we just passed a results dictionary that was used by the threads to store their results. In our case, we can't do that and we just had to print the result, which is useless for a real life program.

There are several mechanisms to share data with multiprocessing, in this lesson we'll focus in Queues and Pipes.

Queues

Queues in the multiprocessing module have a very similar API than the thread safe ones in the queue module, so let's just see an example:

In [18]:
work_to_do = mp.JoinableQueue()
work_done = mp.SimpleQueue()
In [19]:
[work_to_do.put(n) for n in numbers];
In [20]:
MAX_WORKERS = 5
In [21]:
def process_consumer(task_queue, results_queue):
    while True:
        try:
            number = task_queue.get_nowait()
            result = is_prime(number)
            results_queue.put((number, result))
        except queue.Empty:
            print('No more numbers to process. Exiting...')
            return
In [22]:
process_pool = [Process(target=process_consumer, args=(work_to_do, work_done)) for _ in range(MAX_WORKERS)]
In [23]:
[p.start() for p in process_pool];
No more numbers to process. Exiting...
No more numbers to process. Exiting...
No more numbers to process. Exiting...
No more numbers to process. Exiting...
No more numbers to process. Exiting...
In [24]:
[p.join() for p in process_pool];
In [25]:
[p.close() for p in process_pool];
In [26]:
while not work_done.empty():
    number, prime = work_done.get()
    if prime:
        print(f'{number} IS PRIME ✅')
    else:
        print(f'{number} IS NOT PRIME ❌')
15492810 IS NOT PRIME ❌
15492833 IS PRIME ✅
15492781 IS PRIME ✅
15492787 IS PRIME ✅
15492803 IS PRIME ✅
15492811 IS PRIME ✅
15492859 IS PRIME ✅
15520301 IS PRIME ✅
15502547 IS PRIME ✅
15527509 IS PRIME ✅

Pipes

Pipes are not as safe as Queues, as data can be corrupted and it's hard to know when to start polling the pipe. In our following example, we're assuming we're going to receive all 10 messages that we're expecting to receive given we're starting 10 processes. But the reality is that one of those processes could die before sending the message and we're going to wait forever.

In [27]:
main_conn, worker_conn = mp.Pipe()
In [28]:
def process_prime_worker(number, pipe_connection):
    result = is_prime(number)
    pipe_connection.send((number, result))
In [29]:
processes = [Process(target=process_prime_worker, args=(number, worker_conn)) for number in numbers]
In [30]:
[p.start() for p in processes];
In [31]:
[p.join() for p in processes];
In [32]:
[p.close() for p in processes];
In [33]:
received = 0
while received < 10:
    number, prime_result = main_conn.recv()
    received += 1
    if prime_result:
        print(f'{number} IS PRIME ✅')
    else:
        print(f'{number} IS NOT PRIME ❌')
15492810 IS NOT PRIME ❌
15492811 IS PRIME ✅
15492803 IS PRIME ✅
15492859 IS PRIME ✅
15527509 IS PRIME ✅
15492781 IS PRIME ✅
15492787 IS PRIME ✅
15520301 IS PRIME ✅
15502547 IS PRIME ✅
15492833 IS PRIME ✅

Process Pools

The multiprocessing module contains a very useful

In [34]:
import time
In [35]:
with mp.Pool(processes=4) as pool:
    n1 = random.choice(numbers)
    n2 = random.choice(numbers)

    r1 = pool.apply_async(is_prime, (n1, ))
    r2 = pool.apply_async(is_prime, (n2, ))

    print(f"Number {n1} {'is prime' if r1.get() else 'is NOT prime'}")
    print(f"Number {n2} {'is prime' if r2.get() else 'is NOT prime'}")
Number 15492810 is NOT prime
Number 15502547 is prime

As you can see, we're using our regular is_prime function. This is important, as there seems to be some "data sharing" behind the scenes, which simplifies the API. The apply_async function submits a "task" to perform, it's computed behind the scenes and an AsyncResult is returned.

Pools have other useful methods, like for example map and map_async (and other variants like imap or starmap). The map method is similar to the map builtin function, or a list comprehension. Let's see an example to process all our prime numbers:

In [36]:
start = time.time()
In [37]:
with mp.Pool(processes=10) as pool:
    results = pool.map(is_prime, numbers)
    print(f"Found {sum(results)} primes for a total of {len(results)}")
Found 9 primes for a total of 10
In [38]:
time.time() - start
Out[38]:
0.7631411552429199

The map interface is very convenient, it feels like a regular synchronous Python API, but behind the scenes is using a pool of multiple processes. In our next lesson, when we talk about concurrent.futures we'll see why familiar and intuitive interfaces make our life easier.

Summary

In this lesson we're just scratching the surface of multiprocessing work. Sadly, working with multiple processes is a lot harder than using threads, as it requires a deeper understanding of the operating system and it's a lot less safe.

Notebooks AI
Notebooks AI Profile20060