Concurrency-4-Python-Threads

Last updated: December 27th, 20202020-12-27Project preview
In [33]:
import time
import random
import threading
from threading import Thread

In Python 3, threading is the module used to create and use threads. There's a low level module _thread but it's not recommended to use it directly. I'm mentioning it just as a warning, don't use _thread!.

The most important class in the threading module is: Thread (doh!).

Very simplified, this is how a thread is instantiated:

class Thread:
    def __init__(self, target, name=None, args=(), kwargs={}):
        pass

(there's a group argument which should be always None, as it's reserved for future use)

In this case, target is the function that will be executed in that particular thread.

Once a thread has been created (instantiated), we'll need to start() it in order for it to begin to process.

Basic example of a thread

In [34]:
def simple_worker():
    print('hello', flush=True)
    time.sleep(5)
    print('world', flush=True)
In [35]:
t1 = Thread(target=simple_worker)
In [36]:
t1.start()
hello

Running multiple threads in parallel

In [37]:
def simple_worker(thread_number):
    time.sleep(random.random() * 5)
    print(f'{thread_number} done!')
In [40]:
t1 = Thread(target=simple_worker, args=('Thread 1',))
t2 = Thread(target=simple_worker, args=('Thread 2',))
In [41]:
t1.start()
t2.start()
Thread 2 done!
Thread 1 done!

Notice the order of completion might change from one thread to another.

Thread States

A thread can be in multiple states, as show in this simple diagram:

thread-states

When a thread has just been created (that is, instantiated), its state is "ready":

In [42]:
def simple_worker():
    print('Thread running...')
    time.sleep(5)
    print('Thread finished...')
In [43]:
t = Thread(target=simple_worker)
In [44]:
t.is_alive()
Out[44]:
False

We'll now start the thread, to move it from "ready" to "runnable".

In [45]:
t.start()
Thread running...
In [46]:
t.is_alive()
Out[46]:
True

Waiting for a thread:

In [47]:
t.join()
Thread finished...

A thread that has finished can't be started again, as shown in the following example:

In [48]:
t.start()
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-48-d161150ae45f> in <module>
----> 1 t.start()

~/.pyenv/versions/3.8.0/lib/python3.8/threading.py in start(self)
    846 
    847         if self._started.is_set():
--> 848             raise RuntimeError("threads can only be started once")
    849         with _active_limbo_lock:
    850             _limbo[self] = self

RuntimeError: threads can only be started once

Important: It's not possible(*) to manage thread states manually, for example, stopping a thread. A thread always has to run its natural cycle.

(*) You might find hacks in the internet on how to stop threads, but it's a bad practice. We'll discuss more later.

Thread Identity

The thread class has two attributes that lets us identify each thread. The human-ready name, which we can set when we construct the thread, and the machine-oriented ident one

In [30]:
def simple_worker():
    print('Thread running...')
    time.sleep(5)
    print('Thread exiting...')
In [31]:
t = Thread(target=simple_worker)
In [32]:
t.name
Out[32]:
'Thread-18'

ident will be Noneuntil we run the thread.

In [33]:
t.ident is None
Out[33]:
True
In [34]:
t.start()
Thread running...
In [35]:
t.name
Out[35]:
'Thread-18'
In [36]:
t.ident
Out[36]:
123145565192192
Thread exiting...

We can create a thread and assign a custom name to it:

In [37]:
t = Thread(target=simple_worker, name='PyCon 2020 Tutorial!')
In [38]:
t.start()
Thread running...
In [39]:
t.name
Out[39]:
'PyCon 2020 Tutorial!'
In [40]:
t.ident
Out[40]:
123145565192192
Thread exiting...

A thread knows itself

It's also possible to know the identity of the thread from within the thread itself. It might be counter intuitive as we don't have the reference to the created object, but the module function threading.currentThread() will provide access to it.

In [41]:
def simple_worker():
    sleep_secs = random.randint(1, 5)
    myself = threading.current_thread()
    ident = threading.get_ident()
    print(f"I am thread {myself.name} (ID {ident}), and I'm sleeping for {sleep_secs}.")
    time.sleep(sleep_secs)
    print(f'Thread {myself.name} exiting...')
In [42]:
t1 = Thread(target=simple_worker, name='Bubbles')
t2 = Thread(target=simple_worker, name='Blossom')
t3 = Thread(target=simple_worker, name='Buttercup')
In [43]:
t1.start()
I am thread Bubbles (ID 123145565192192), and I'm sleeping for 2.
In [44]:
t2.start()
I am thread Blossom (ID 123145581981696), and I'm sleeping for 4.
In [45]:
t3.start()
I am thread Buttercup (ID 123145598771200), and I'm sleeping for 4.
In [46]:
print('Waiting...')
Thread Bubbles exiting...
Waiting...
Thread Blossom exiting...
Thread Buttercup exiting...

Passing parameters to threads

Passing parameters is simple with the thread constructor, just use the args argument:

In [47]:
def simple_worker(time_to_sleep):
    myself = threading.current_thread()
    ident = threading.get_ident()
    print(f"I am thread {myself.name} (ID {ident}), and I'm sleeping for {time_to_sleep}.")
    time.sleep(time_to_sleep)
    print(f'Thread {myself.name} exiting...')
In [48]:
t1 = Thread(target=simple_worker, name='Bubbles', args=(3, ))
t2 = Thread(target=simple_worker, name='Blossom', args=(1.5, ))
t3 = Thread(target=simple_worker, name='Buttercup', args=(2, ))
In [49]:
t1.start()
I am thread Bubbles (ID 123145565192192), and I'm sleeping for 3.
In [50]:
t2.start()
I am thread Blossom (ID 123145581981696), and I'm sleeping for 1.5.
In [51]:
t3.start()
I am thread Buttercup (ID 123145598771200), and I'm sleeping for 2.
Thread Blossom exiting...
Thread Buttercup exiting...
Thread Bubbles exiting...

Subclassing Thread

So far, the way we've created threads is by passing a target function to be executed. There's an alternative, more OOP-way to do it, which is extending the Thread class:

In [52]:
class MyThread(Thread):
    def __init__(self, time_to_sleep, name=None):
        super().__init__(name=name)
        self.time_to_sleep = time_to_sleep
        
    def run(self):
        ident = threading.get_ident()
        print(f"I am thread {self.name} (ID {ident}), and I'm sleeping for {self.time_to_sleep} secs.")
        time.sleep(self.time_to_sleep)
        print(f'Thread {self.name} exiting...')
In [53]:
t = MyThread(2)
In [54]:
t.start()
I am thread Thread-19 (ID 123145565192192), and I'm sleeping for 2 secs.
Thread Thread-19 exiting...

Shared Data

As we've discussed in our previous lecture, Threads can access shared data within the process they live in. Example:

In [63]:
TIME_TO_SLEEP = 1.5
In [64]:
def simple_worker():
    myself = threading.current_thread()
    print(f"I am thread {myself.name}, and I'm sleeping for {TIME_TO_SLEEP}.")
    time.sleep(TIME_TO_SLEEP)
    print(f'Thread {myself.name} exiting...')
In [65]:
t1 = Thread(target=simple_worker, name='Bubbles')
t2 = Thread(target=simple_worker, name='Blossom')
t3 = Thread(target=simple_worker, name='Buttercup')
In [66]:
t1.start()
I am thread Bubbles, and I'm sleeping for 1.5.
In [67]:
t2.start()
I am thread Blossom, and I'm sleeping for 1.5.
In [68]:
t3.start()
Thread Bubbles exiting...
I am thread Buttercup, and I'm sleeping for 1.5.
Thread Blossom exiting...
Thread Buttercup exiting...

How is this possible?

Remember, all threads live within the same process, and the variable TIME_TO_SLEEP is stored in the process. So all the threads created can access that variable.

thread_shared_data

A real example

To see a real example of threads, we'll use a live Cryptos API that returns price of different cryptocurrencies (Bitcoin, Litecoin, Ether, etc). We'll simulate some delay time using the Slowwly, a service that purposely slows down/delays our requests.

In [55]:
import requests
In [58]:
DELAY = 1100
In [78]:
SLOWWLY_BASE_URL = f"http://slowwly.robertomurray.co.uk/delay/{DELAY}/url/"
In [79]:
CRYPTOWATCH_BASE_URL = "https://api.cryptowat.ch/markets/{exchange}/btcusd/price"
In [80]:
resp = requests.get(CRYPTOWATCH_BASE_URL.format(exchange='kraken'))
In [81]:
resp.json()
Out[81]:
{'result': {'price': 19220},
 'allowance': {'cost': 0.005,
  'remaining': 8.45,
  'upgrade': 'For unlimited API access, create an account at https://cryptowat.ch'}}
Sequential test:

We'll try to get the price from all 3 exchanges using a sequential approach:

In [83]:
EXCHANGES = ['bitstamp', 'bitfinex', 'kraken']
In [82]:
start = time.time()
In [84]:
for exchange in EXCHANGES:
    url = CRYPTOWATCH_BASE_URL.format(exchange=exchange)
    resp = requests.get(SLOWWLY_BASE_URL + url)
    print(f"{exchange.title()}: ${resp.json()['result']['price']}")
Bitstamp: $19218.42
Bitfinex: $19236
Kraken: $19219.9
In [76]:
time.time() - start
Out[76]:
5.909564971923828

Let's now move it to threads! For now, we'll just print the output, as we'll se data sharing in further lessons...

In [85]:
def check_price(exchange):
    url = CRYPTOWATCH_BASE_URL.format(exchange=exchange)
    resp = requests.get(SLOWWLY_BASE_URL + url)
    print(f"{exchange.title()}: ${resp.json()['result']['price']}")
In [86]:
check_price('bitfinex')
Bitfinex: $19235.57445
In [87]:
threads = [
    Thread(target=check_price, args=(exchange, ))
    for exchange in EXCHANGES
]
In [88]:
start = time.time()
In [89]:
[t.start() for t in threads];
In [90]:
[t.join() for t in threads];
Kraken: $19221
Bitstamp: $19201.23
Bitfinex: $19230.1394226
In [91]:
time.time() - start
Out[91]:
2.1012091636657715

How many threads can we start?

Let's say we need to get prices for 10 exchanges, 3 symbols, for a total of 30 days. Those are a lot of requests:

In [85]:
10 * 3 * 30
Out[85]:
900

Can we start 900 threads all at once? Sadly, we can't. Each threads consumes resources and too many threads are usually a problem.

So, what can we do when we need to process too many concurrent jobs? We'll create workers and use a consumer-producer model. But first, we need to talk about shared data, race conditions and synchronization...

Summary:

  • threading module ✅
  • _thread module ❌

A thread's life cycle is Instantiated > Started > Running > Finished.

Notebooks AI
Notebooks AI Profile20060