import time
import random
import threading
from threading import Thread
In Python 3, threading
is the module used to create and use threads. There's a low level module _thread
but it's not recommended to use it directly. I'm mentioning it just as a warning, don't use _thread
!.
The most important class in the threading
module is: Thread
(doh!).
Very simplified, this is how a thread is instantiated:
class Thread:
def __init__(self, target, name=None, args=(), kwargs={}):
pass
(there's a group
argument which should be always None
, as it's reserved for future use)
In this case, target
is the function that will be executed in that particular thread.
Once a thread has been created (instantiated), we'll need to start()
it in order for it to begin to process.
Basic example of a thread¶
def simple_worker():
print('hello', flush=True)
time.sleep(5)
print('world', flush=True)
t1 = Thread(target=simple_worker)
t1.start()
Running multiple threads in parallel¶
def simple_worker(thread_number):
time.sleep(random.random() * 5)
print(f'{thread_number} done!')
t1 = Thread(target=simple_worker, args=('Thread 1',))
t2 = Thread(target=simple_worker, args=('Thread 2',))
t1.start()
t2.start()
Notice the order of completion might change from one thread to another.
Thread States¶
A thread can be in multiple states, as show in this simple diagram:
When a thread has just been created (that is, instantiated), its state is "ready"
:
def simple_worker():
print('Thread running...')
time.sleep(5)
print('Thread finished...')
t = Thread(target=simple_worker)
t.is_alive()
We'll now start the thread, to move it from "ready" to "runnable".
t.start()
t.is_alive()
Waiting for a thread:
t.join()
A thread that has finished can't be started again, as shown in the following example:
t.start()
Important: It's not possible(*) to manage thread states manually, for example, stopping a thread. A thread always has to run its natural cycle.
(*) You might find hacks in the internet on how to stop threads, but it's a bad practice. We'll discuss more later.
Thread Identity¶
The thread class has two attributes that lets us identify each thread. The human-ready name
, which we can set when we construct the thread, and the machine-oriented ident
one
def simple_worker():
print('Thread running...')
time.sleep(5)
print('Thread exiting...')
t = Thread(target=simple_worker)
t.name
ident
will be None
until we run the thread.
t.ident is None
t.start()
t.name
t.ident
We can create a thread and assign a custom name to it:
t = Thread(target=simple_worker, name='PyCon 2020 Tutorial!')
t.start()
t.name
t.ident
A thread knows itself¶
It's also possible to know the identity of the thread from within the thread itself. It might be counter intuitive as we don't have the reference to the created object, but the module function threading.currentThread()
will provide access to it.
def simple_worker():
sleep_secs = random.randint(1, 5)
myself = threading.current_thread()
ident = threading.get_ident()
print(f"I am thread {myself.name} (ID {ident}), and I'm sleeping for {sleep_secs}.")
time.sleep(sleep_secs)
print(f'Thread {myself.name} exiting...')
t1 = Thread(target=simple_worker, name='Bubbles')
t2 = Thread(target=simple_worker, name='Blossom')
t3 = Thread(target=simple_worker, name='Buttercup')
t1.start()
t2.start()
t3.start()
print('Waiting...')
Passing parameters to threads¶
Passing parameters is simple with the thread constructor, just use the args
argument:
def simple_worker(time_to_sleep):
myself = threading.current_thread()
ident = threading.get_ident()
print(f"I am thread {myself.name} (ID {ident}), and I'm sleeping for {time_to_sleep}.")
time.sleep(time_to_sleep)
print(f'Thread {myself.name} exiting...')
t1 = Thread(target=simple_worker, name='Bubbles', args=(3, ))
t2 = Thread(target=simple_worker, name='Blossom', args=(1.5, ))
t3 = Thread(target=simple_worker, name='Buttercup', args=(2, ))
t1.start()
t2.start()
t3.start()
Subclassing Thread
¶
So far, the way we've created threads is by passing a target
function to be executed. There's an alternative, more OOP-way to do it, which is extending the Thread class:
class MyThread(Thread):
def __init__(self, time_to_sleep, name=None):
super().__init__(name=name)
self.time_to_sleep = time_to_sleep
def run(self):
ident = threading.get_ident()
print(f"I am thread {self.name} (ID {ident}), and I'm sleeping for {self.time_to_sleep} secs.")
time.sleep(self.time_to_sleep)
print(f'Thread {self.name} exiting...')
t = MyThread(2)
t.start()
Shared Data¶
As we've discussed in our previous lecture, Threads can access shared data within the process they live in. Example:
TIME_TO_SLEEP = 1.5
def simple_worker():
myself = threading.current_thread()
print(f"I am thread {myself.name}, and I'm sleeping for {TIME_TO_SLEEP}.")
time.sleep(TIME_TO_SLEEP)
print(f'Thread {myself.name} exiting...')
t1 = Thread(target=simple_worker, name='Bubbles')
t2 = Thread(target=simple_worker, name='Blossom')
t3 = Thread(target=simple_worker, name='Buttercup')
t1.start()
t2.start()
t3.start()
How is this possible?
Remember, all threads live within the same process, and the variable TIME_TO_SLEEP
is stored in the process. So all the threads created can access that variable.
import requests
DELAY = 1100
SLOWWLY_BASE_URL = f"http://slowwly.robertomurray.co.uk/delay/{DELAY}/url/"
CRYPTOWATCH_BASE_URL = "https://api.cryptowat.ch/markets/{exchange}/btcusd/price"
resp = requests.get(CRYPTOWATCH_BASE_URL.format(exchange='kraken'))
resp.json()
Sequential test:¶
We'll try to get the price from all 3 exchanges using a sequential approach:
EXCHANGES = ['bitstamp', 'bitfinex', 'kraken']
start = time.time()
for exchange in EXCHANGES:
url = CRYPTOWATCH_BASE_URL.format(exchange=exchange)
resp = requests.get(SLOWWLY_BASE_URL + url)
print(f"{exchange.title()}: ${resp.json()['result']['price']}")
time.time() - start
Let's now move it to threads! For now, we'll just print the output, as we'll se data sharing in further lessons...
def check_price(exchange):
url = CRYPTOWATCH_BASE_URL.format(exchange=exchange)
resp = requests.get(SLOWWLY_BASE_URL + url)
print(f"{exchange.title()}: ${resp.json()['result']['price']}")
check_price('bitfinex')
threads = [
Thread(target=check_price, args=(exchange, ))
for exchange in EXCHANGES
]
start = time.time()
[t.start() for t in threads];
[t.join() for t in threads];
time.time() - start
How many threads can we start?¶
Let's say we need to get prices for 10 exchanges, 3 symbols, for a total of 30 days. Those are a lot of requests:
10 * 3 * 30
Can we start 900 threads all at once? Sadly, we can't. Each threads consumes resources and too many threads are usually a problem.
So, what can we do when we need to process too many concurrent jobs? We'll create workers and use a consumer-producer model. But first, we need to talk about shared data, race conditions and synchronization...
Summary:¶
threading
module ✅_thread
module ❌
A thread's life cycle is Instantiated > Started > Running > Finished.