import time
import queue
import random
import multiprocessing as mp
from multiprocessing import Process
# Don't worry about this for now...
mp.set_start_method('fork')
Multiprocessing¶
The second most common way to write concurrent programs is with the use of multiprocessing (I'm leaving asyncio aside). It's the oldest concurrency concept, as processes predate threads.
In a multiprocessing program, we'll create multiple processes that will be ran concurrently (and potentially in parallel) by the operating system. It's important to stress the fact that when we write multiprocessing code, we're giving full authority to the OS to manage and schedule our processes.
How can multiprocessing help with the GIL?¶
The main issue with the GIL was to protect shared data (low level data like reference counts) between threads. But, what if there's NO shared data at all? If you remember from our OS review before, data is shared only within the same process. Different processes DON'T share data. Which means that there's no GIL to worry about.
Then, why not to use multiprocessing all the time?¶
If multiprocessing doesn't suffer from the GIL, why not to use it instead of multithreading? As usual with computers, there's no free lunch. Multiprocessing suffers from 2 major drawbacks:
1. Slower, resource heavy¶
Creating new processes is a lot slower than spawning threads. And by spawning new processes, we're duplicating all the information of our processes: share data, file descriptors, etc.

2. Hard to orchestrate¶
As processes don't share data, it's hard to coordinate results and flags between multiple processes. We'll see this in an example later.
The Process
API¶
The multiprocessing
module has a Process
class with an API very similar to the one in threading.Thread
.
Warning: Always make sure you're using the module
multiprocessing
and notsubprocess
.
Let's see an example:
def say_hello():
myself = mp.current_process()
print(f'Hello World from "{myself.name}"')
p = Process(target=say_hello, name="My First Process")
p.start()
p.join()
It's important to free up resources allocated by such process:
p.close()
Find prime example¶
Let's verify if processes can actually run in parallel by running again our "check primes" example. If that's the case, we'll see a significant improvement in time. Remember, our multi-threaded version took ~4 seconds to process all 10 numbers.
def is_prime(n):
if n in (2, 3):
return True
if n % 2 == 0:
return False
for divisor in range(3, n, 2):
if n % divisor == 0:
return False
return True
with open('prime_mixture.txt') as fp:
numbers = [int(n.strip()) for n in fp.read().split() if n]
numbers[:5]
def check_prime_worker(number):
if is_prime(number):
print(f'{number} IS PRIME ✅', flush=True)
else:
print(f'{number} IS NOT PRIME ❌', flush=True)
processes = [Process(target=check_prime_worker, args=(number,)) for number in numbers]
start = time.time()
[p.start() for p in processes];
[p.join() for p in processes];
time.time() - start
[p.close() for p in processes];
We can see a clear running time improvement, from ~4 seconds to ~0.7, which means that processes are indeed running in parallel.
Sharing data with processes¶
It's not as simple as with threads to share data. In our multithreaded example, we just passed a results
dictionary that was used by the threads to store their results. In our case, we can't do that and we just had to print the result, which is useless for a real life program.
There are several mechanisms to share data with multiprocessing, in this lesson we'll focus in Queue
s and Pipe
s.
Queues¶
Queues in the multiprocessing module have a very similar API than the thread safe ones in the queue
module, so let's just see an example:
work_to_do = mp.JoinableQueue()
work_done = mp.SimpleQueue()
[work_to_do.put(n) for n in numbers];
MAX_WORKERS = 5
def process_consumer(task_queue, results_queue):
while True:
try:
number = task_queue.get_nowait()
result = is_prime(number)
results_queue.put((number, result))
except queue.Empty:
print('No more numbers to process. Exiting...')
return
process_pool = [Process(target=process_consumer, args=(work_to_do, work_done)) for _ in range(MAX_WORKERS)]
[p.start() for p in process_pool];
[p.join() for p in process_pool];
[p.close() for p in process_pool];
while not work_done.empty():
number, prime = work_done.get()
if prime:
print(f'{number} IS PRIME ✅')
else:
print(f'{number} IS NOT PRIME ❌')
Pipes¶
Pipes are not as safe as Queues, as data can be corrupted and it's hard to know when to start polling the pipe. In our following example, we're assuming we're going to receive all 10 messages that we're expecting to receive given we're starting 10 processes. But the reality is that one of those processes could die before sending the message and we're going to wait forever.
main_conn, worker_conn = mp.Pipe()
def process_prime_worker(number, pipe_connection):
result = is_prime(number)
pipe_connection.send((number, result))
processes = [Process(target=process_prime_worker, args=(number, worker_conn)) for number in numbers]
[p.start() for p in processes];
[p.join() for p in processes];
[p.close() for p in processes];
received = 0
while received < 10:
number, prime_result = main_conn.recv()
received += 1
if prime_result:
print(f'{number} IS PRIME ✅')
else:
print(f'{number} IS NOT PRIME ❌')
Process Pools¶
The multiprocessing
module contains a very useful
import time
with mp.Pool(processes=4) as pool:
n1 = random.choice(numbers)
n2 = random.choice(numbers)
r1 = pool.apply_async(is_prime, (n1, ))
r2 = pool.apply_async(is_prime, (n2, ))
print(f"Number {n1} {'is prime' if r1.get() else 'is NOT prime'}")
print(f"Number {n2} {'is prime' if r2.get() else 'is NOT prime'}")
As you can see, we're using our regular is_prime
function. This is important, as there seems to be some "data sharing" behind the scenes, which simplifies the API. The apply_async
function submits a "task" to perform, it's computed behind the scenes and an AsyncResult
is returned.
Pools have other useful methods, like for example map
and map_async
(and other variants like imap
or starmap
). The map
method is similar to the map
builtin function, or a list comprehension. Let's see an example to process all our prime numbers:
start = time.time()
with mp.Pool(processes=10) as pool:
results = pool.map(is_prime, numbers)
print(f"Found {sum(results)} primes for a total of {len(results)}")
time.time() - start
The map
interface is very convenient, it feels like a regular synchronous Python API, but behind the scenes is using a pool of multiple processes. In our next lesson, when we talk about concurrent.futures
we'll see why familiar and intuitive interfaces make our life easier.
Summary¶
In this lesson we're just scratching the surface of multiprocessing work. Sadly, working with multiple processes is a lot harder than using threads, as it requires a deeper understanding of the operating system and it's a lot less safe.