Concurrency-5-Data-and-Sync

Last updated: February 15th, 20212021-02-15Project preview
In [1]:
import time
import threading
from threading import Thread
import random
import sys

Shared Data and Synchronization

As we saw in our previous lecture, threads can access to share data. This is useful to communicate things and access common variables. But it'll also introduce problems, mainly Race Conditions. Let's see an example of a Race Condition.

In [27]:
COUNTER = 0
In [28]:
def increment(n):
    global COUNTER
    for _ in range(n):
        COUNTER += 1
In [29]:
ITERATIONS = 50_000
N_THREADS = 10
In [30]:
threads = [Thread(target=increment, args=(ITERATIONS,)) for _ in range(N_THREADS)]
In [31]:
[t.start() for t in threads];
In [32]:
[t.join() for t in threads];
In [33]:
COUNTER
Out[33]:
419138
In [ ]:
assert COUNTER == (N_THREADS * ITERATIONS), f"Invalid value for counter: {COUNTER}"

What's happening here is that the operation += is not atomic. Which means that, behind the scenes, += runs 2 (or more) operations:

aux = COUNTER + 1
COUNTER = aux

Both threads are executing concurrently (potentially at the same time) and they're reading outdated values of COUNTER, which results in a race condition.

Thread Synchronization

How can we fix this race condition? Basically, we need a way to keep the threads from stepping onto each other's data, some signal that the given resource "is busy":

(Example of our studios, a recording light is on, the studio is busy, nobody will enter the room)

The easiest synchronization mechanism is a Lock), or a Mutex (mutual exclusion lock). Python includes the very intuitive threading.Lock class. Let's see how a Lock works.

A Lock works in the same way as the Studio Light from the picture. The first one that "arrives" to that given resource "turns on the light", or, formally, "acquires the lock". Any other threads reaching that point, if they want to acquire the lock, they have to wait for the first thread to "release it". Let's see an example:

Locking

In [34]:
lock = threading.Lock()
In [35]:
def lock_hogger(lock, sleep=10):
    print("\t\tThread: Acquiring lock.")
    lock.acquire()
    print("\t\tThread: Lock acquired, sleeping")
    if sleep:
        time.sleep(sleep)
    print("\t\tThread: Woke up, releasing lock")
    lock.release()
In [36]:
t = Thread(target=lock_hogger, args=(lock, 10))
In [37]:
t.start()
		Thread: Acquiring lock.
		Thread: Lock acquired, sleeping

Trying to acquire the lock here will probably block for a few seconds:

In [38]:
lock.locked()
Out[38]:
True
In [39]:
lock.acquire()
print("Lock acquired!")
		Thread: Woke up, releasing lock
Lock acquired!
In [40]:
lock.locked()
Out[40]:
True

Once the lock has been acquired, any other thread that tries to acquire it will block:

In [41]:
t = Thread(target=lock_hogger, args=(lock, 0))
In [42]:
t.start()
		Thread: Acquiring lock.
In [43]:
lock.release()
		Thread: Lock acquired, sleeping
		Thread: Woke up, releasing lock
In [44]:
lock.locked()
Out[44]:
False

Time to fix our counter!

Now that we know about locks, we can use them to fix our counter example:

In [62]:
COUNTER = 0
In [63]:
def increment(n, lock):
    global COUNTER
    for _ in range(n):
        lock.acquire()
        COUNTER += 1
        lock.release()
In [64]:
ITERATIONS = 50_000
N_THREADS = 10
In [65]:
lock = threading.Lock()
In [66]:
threads = [Thread(target=increment, args=(ITERATIONS, lock)) for _ in range(N_THREADS)]
In [67]:
[t.start() for t in threads];
In [68]:
[t.join() for t in threads];
In [69]:
COUNTER
Out[69]:
500000
In [53]:
assert COUNTER == (len(threads) * ITERATIONS), f"Invalid value for counter: {COUNTER}"

It doesn't matter how many times we run the example, our code will always be synchronized!

Problems with synchronization

Locks are acquired before accessing what we call "Critical Sections"; important sections in our code that can potentially introduce race conditions. The usual process is:

lock = threading.Lock() # Problem [1]

# before entering critical section
lock.acquire()          # Problem [2]

# critical section
do_your_thing()         # Problem [3]

# after we're done with it
lock.release()          # Problem [4]

The problem is that locks (and many other synchronization mechanisms) are "cooperative". You're cooperating by using locks, but you're not obliged to use them. In a team of n developers, just one of them screws up with their lock management, everybody loses.

These are the things that can potentially go wrong with cooperative, manual synchronization mechanisms:

  1. You might forget to use locks at all! You might have failed to recognize the situation as having a "critical section".
  2. You might forget to acquire the lock, getting directly into the critical section.
  3. Your critical section might be using resources NOT protected by the lock you're using, so other threads can be stepping onto that.
  4. You might forget to release the lock, or your code could break before you're able to release the lock (next example)
  5. Deadlocks! (more on later).

Problem No.4 is very common, let's see an example of it:

In [70]:
lock = threading.Lock()
In [71]:
def faulty_lock_handler(lock, sleep=10):
    print("\t\tThread: Acquiring lock.")
    lock.acquire()
    print("\t\tThread: Lock acquired")
    if sleep:
        time.sleep(sleep)
    print("\t\tThread: Woke up, releasing lock")
    lock.release()
In [72]:
t = Thread(target=faulty_lock_handler, args=(lock, 'x'))  # The `sleep` param is incorrect, should be a number
In [73]:
t.start()
		Thread: Acquiring lock.
		Thread: Lock acquired
Exception in thread Thread-75:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-71-40265bd9d4c2>", line 6, in faulty_lock_handler
TypeError: an integer is required (got type str)

Trying to acquire the lock will block FOREVER:

In [74]:
lock.acquire()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-74-7d28dc795612> in <module>
----> 1 lock.acquire()

KeyboardInterrupt: 

We could potentially add a timeout to our acquire method, it will block for n seconds and if it hasn't acquired the lock, it'll return False:

In [77]:
lock.acquire(timeout=2)
Out[77]:
False

Or we can even make it non-blocking, if it's not able to acquire the lock, it'll release it immediately:

In [78]:
lock.acquire(blocking=False)
Out[78]:
False

Thankfully, we have a handle on the lock variable, so we can release it from here, but this is cheating:

In [79]:
lock.release()
In [80]:
lock.acquire(blocking=False)
Out[80]:
True

The way to solve this is to use Locks as Context Managers, so we're sure we'll release the lock EVEN if something goes wrong within the critical section:

In [81]:
lock = threading.Lock()
In [82]:
def fixed_lock_handler(lock, sleep=10):
    print("\t\tThread: Acquiring lock.")
    with lock:
        print("\t\tThread: Lock acquired")
        if sleep:
            time.sleep(sleep)
    print("\t\tThread: Woke up, releasing lock")
In [83]:
t = Thread(target=fixed_lock_handler, args=(lock, 5))
In [84]:
t.start()
		Thread: Acquiring lock.
		Thread: Lock acquired
		Thread: Woke up, releasing lock
In [85]:
lock.acquire()
Out[85]:
True
In [86]:
lock.release()
In [87]:
t = Thread(target=fixed_lock_handler, args=(lock, 'x'))  # The `sleep` param is incorrect, should be a number
In [88]:
t.start()
		Thread: Acquiring lock.
		Thread: Lock acquired
Exception in thread Thread-77:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-82-9d5dd1ab9e64>", line 6, in fixed_lock_handler
TypeError: an integer is required (got type str)

Is the lock still acquired?

In [89]:
lock.locked()
Out[89]:
False
In [90]:
lock.acquire()
Out[90]:
True
In [91]:
lock.release()

As you can see, the critical section failed with an exception, but the lock was released before exiting. As a reference, the with context manager is syntactic sugar for the pattern:

lock.acquire()
try:
    critical_section()
finally:
    lock.release()  # We'll release the lock no matter what

Fixing our counter using with

The last touch for our counter should be to use the context manager protocol of the lock object:

In [92]:
COUNTER = 0
In [93]:
def increment(n, lock):
    global COUNTER
    for _ in range(n):
        with lock:
            COUNTER += 1
In [94]:
ITERATIONS = 50_000
N_THREADS = 10
In [95]:
lock = threading.Lock()
In [96]:
threads = [Thread(target=increment, args=(ITERATIONS, lock)) for _ in range(N_THREADS)]
In [97]:
[t.start() for t in threads];
In [98]:
[t.join() for t in threads];
In [99]:
COUNTER
Out[99]:
500000
In [100]:
assert COUNTER == (len(threads) * ITERATIONS), f"Invalid value for counter: {COUNTER}"

Perfect!

Summary:

We've seen the importance of keeping our critical sections safe, to avoid race conditions. But there's no free lunch. To prevent race conditions we have to use synchronization mechanisms, and as we saw, that can carry other issues.

In the next section we'll explore one of the many things that can go wrong with manual synchronization: one of the the scariest words in computer science: Deadlocks.

Notebooks AI
Notebooks AI Profile20060