import time
import threading
from threading import Thread
import random
import sys
Shared Data and Synchronization¶
As we saw in our previous lecture, threads can access to share data. This is useful to communicate things and access common variables. But it'll also introduce problems, mainly Race Conditions. Let's see an example of a Race Condition.
COUNTER = 0
def increment(n):
global COUNTER
for _ in range(n):
COUNTER += 1
ITERATIONS = 50_000
N_THREADS = 10
threads = [Thread(target=increment, args=(ITERATIONS,)) for _ in range(N_THREADS)]
[t.start() for t in threads];
[t.join() for t in threads];
COUNTER
assert COUNTER == (N_THREADS * ITERATIONS), f"Invalid value for counter: {COUNTER}"
What's happening here is that the operation +=
is not atomic. Which means that, behind the scenes, +=
runs 2 (or more) operations:
aux = COUNTER + 1
COUNTER = aux
Both threads are executing concurrently (potentially at the same time) and they're reading outdated values of COUNTER
, which results in a race condition.
Thread Synchronization¶
How can we fix this race condition? Basically, we need a way to keep the threads from stepping onto each other's data, some signal that the given resource "is busy":
(Example of our studios, a recording light is on, the studio is busy, nobody will enter the room)
The easiest synchronization mechanism is a Lock), or a Mutex (mutual exclusion lock). Python includes the very intuitive threading.Lock
class. Let's see how a Lock works.
A Lock works in the same way as the Studio Light from the picture. The first one that "arrives" to that given resource "turns on the light", or, formally, "acquires the lock". Any other threads reaching that point, if they want to acquire the lock, they have to wait for the first thread to "release it". Let's see an example:
Locking¶
lock = threading.Lock()
def lock_hogger(lock, sleep=10):
print("\t\tThread: Acquiring lock.")
lock.acquire()
print("\t\tThread: Lock acquired, sleeping")
if sleep:
time.sleep(sleep)
print("\t\tThread: Woke up, releasing lock")
lock.release()
t = Thread(target=lock_hogger, args=(lock, 10))
t.start()
Trying to acquire the lock here will probably block for a few seconds:
lock.locked()
lock.acquire()
print("Lock acquired!")
lock.locked()
Once the lock has been acquired, any other thread that tries to acquire it will block:
t = Thread(target=lock_hogger, args=(lock, 0))
t.start()
lock.release()
lock.locked()
Time to fix our counter!¶
Now that we know about locks, we can use them to fix our counter example:
COUNTER = 0
def increment(n, lock):
global COUNTER
for _ in range(n):
lock.acquire()
COUNTER += 1
lock.release()
ITERATIONS = 50_000
N_THREADS = 10
lock = threading.Lock()
threads = [Thread(target=increment, args=(ITERATIONS, lock)) for _ in range(N_THREADS)]
[t.start() for t in threads];
[t.join() for t in threads];
COUNTER
assert COUNTER == (len(threads) * ITERATIONS), f"Invalid value for counter: {COUNTER}"
It doesn't matter how many times we run the example, our code will always be synchronized!
Problems with synchronization¶
Locks are acquired before accessing what we call "Critical Sections"; important sections in our code that can potentially introduce race conditions. The usual process is:
lock = threading.Lock() # Problem [1]
# before entering critical section
lock.acquire() # Problem [2]
# critical section
do_your_thing() # Problem [3]
# after we're done with it
lock.release() # Problem [4]
The problem is that locks (and many other synchronization mechanisms) are "cooperative". You're cooperating by using locks, but you're not obliged to use them. In a team of n developers, just one of them screws up with their lock management, everybody loses.
These are the things that can potentially go wrong with cooperative, manual synchronization mechanisms:
- You might forget to use locks at all! You might have failed to recognize the situation as having a "critical section".
- You might forget to acquire the lock, getting directly into the critical section.
- Your critical section might be using resources NOT protected by the lock you're using, so other threads can be stepping onto that.
- You might forget to release the lock, or your code could break before you're able to release the lock (next example)
- Deadlocks! (more on later).
Problem No.4 is very common, let's see an example of it:
lock = threading.Lock()
def faulty_lock_handler(lock, sleep=10):
print("\t\tThread: Acquiring lock.")
lock.acquire()
print("\t\tThread: Lock acquired")
if sleep:
time.sleep(sleep)
print("\t\tThread: Woke up, releasing lock")
lock.release()
t = Thread(target=faulty_lock_handler, args=(lock, 'x')) # The `sleep` param is incorrect, should be a number
t.start()
Trying to acquire the lock will block FOREVER:
lock.acquire()
We could potentially add a timeout to our acquire method, it will block for n
seconds and if it hasn't acquired the lock, it'll return False
:
lock.acquire(timeout=2)
Or we can even make it non-blocking, if it's not able to acquire the lock, it'll release it immediately:
lock.acquire(blocking=False)
Thankfully, we have a handle on the lock variable, so we can release it from here, but this is cheating:
lock.release()
lock.acquire(blocking=False)
The way to solve this is to use Locks as Context Managers, so we're sure we'll release the lock EVEN if something goes wrong within the critical section:
lock = threading.Lock()
def fixed_lock_handler(lock, sleep=10):
print("\t\tThread: Acquiring lock.")
with lock:
print("\t\tThread: Lock acquired")
if sleep:
time.sleep(sleep)
print("\t\tThread: Woke up, releasing lock")
t = Thread(target=fixed_lock_handler, args=(lock, 5))
t.start()
lock.acquire()
lock.release()
t = Thread(target=fixed_lock_handler, args=(lock, 'x')) # The `sleep` param is incorrect, should be a number
t.start()
Is the lock still acquired?
lock.locked()
lock.acquire()
lock.release()
As you can see, the critical section failed with an exception, but the lock was released before exiting. As a reference, the with
context manager is syntactic sugar for the pattern:
lock.acquire()
try:
critical_section()
finally:
lock.release() # We'll release the lock no matter what
Fixing our counter using with
¶
The last touch for our counter should be to use the context manager protocol of the lock object:
COUNTER = 0
def increment(n, lock):
global COUNTER
for _ in range(n):
with lock:
COUNTER += 1
ITERATIONS = 50_000
N_THREADS = 10
lock = threading.Lock()
threads = [Thread(target=increment, args=(ITERATIONS, lock)) for _ in range(N_THREADS)]
[t.start() for t in threads];
[t.join() for t in threads];
COUNTER
assert COUNTER == (len(threads) * ITERATIONS), f"Invalid value for counter: {COUNTER}"
Perfect!
Summary:¶
We've seen the importance of keeping our critical sections safe, to avoid race conditions. But there's no free lunch. To prevent race conditions we have to use synchronization mechanisms, and as we saw, that can carry other issues.
In the next section we'll explore one of the many things that can go wrong with manual synchronization: one of the the scariest words in computer science: Deadlocks.