import time
import threading
import random
from threading import Thread
from datetime import datetime, timedelta
import requests
The Producer / Consumer model¶
In our Real Life example from our one of our previous lessons, we worked with a simple server that would return price information from cryptocurrencies. We've built that server into a local program to use as testing. It should already be installed, and running. If it's not, try running the following commands in your terminal:
$ pip install crypto-database
$ crypto_db
we can check what exchanges or cryptos are available with a few commands:
!curl http://localhost:8080/exchanges
!curl http://localhost:8080/symbols
!curl http://localhost:8080/price/bitstamp/btc/2020-09-01
There are around 12 exchanges, 3 symbols and approximately 2 years of data. Suppose we wanted to consult prices for 10 exchanges, for all 3 symbols, for an entire month. That'd give us a total of:
10 exchanges, 3 symbols, for a total of 30 days.
10 * 3 * 30
A total of 900 requests... We said we couldn't just start 900 threads at the same time. The solution is to use the "producer/consumer" model, in which some threads produce tasks to do and put them in a shared collection, and other threads "consume" the tasks from said collection and do the work.
We'll create a Pool of threads, let's say 10, which will be constantly consuming the pending tasks and consulting the prices:

We'll use the python Queue
class, from the queue
module, as our shared collection; a thread safe FIFO queue. The queue
module has multiple queues available (LIFO, priority, bounded), but we'll just use SimpleQueue
, which is similar to Queue
without the max element boundary.
Here are the basic methods of a Queue:
import queue
from queue import Queue
q = Queue()
q.empty()
q.put('A')
q.put('B')
q.put('C')
q.empty()
q.qsize()
q.get()
q.get()
q.get()
q.empty()
Queues are specially designed to work with multithreaded applications in a producer/consumer model, if we try to get
from the queue now that is empty, it'll block waiting for more "work" to be added (we'll have to interrupt it):
q.get()
The Queue.get
method has a similar interface as the Lock one. It can return immediately raising a queue.Empty
exception:
q.get(block=False)
Or a timeout (also raising an exception):
q.get(timeout=1)
Queues can also be used to "limit" the concurrency level of your program. You can set an upper limit of how many max elements can be placed in the queue. When the limit is reached, the put
operation will block. The queue is "full":
q = Queue(maxsize=1)
q.put('A')
q.qsize()
This will block:
q.put('B')
Similarly to get
, the put
method accepts block and timeout parameters:
q.put('B', block=False)
q.put('B', timeout=1)
Tracking work done¶
Queues additionally include a useful method task_done()
that is used to track how many tasks have been completed. In pseudocode, the process is usually:
def worker(q):
try:
task = q.get(block=False)
except queue.Empty:
print("All work done. Exiting")
return
do_work(task)
q.task_done() # Notify the task was successfully finished
A Real example¶
We'll now use our knowledge of queues to check multiple prices from our crypto server using threads. We'll start first with the list of exchanges we want to use:
BASE_URL = "http://localhost:8080"
resp = requests.get(f"{BASE_URL}/exchanges")
resp
EXCHANGES = resp.json()
EXCHANGES
We'll use all the exchanges available in the server. We'll ask for 31 days, from March 1st to March 31st:
START_DATE = datetime(2020, 3, 1)
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]
DATES
And for all available symbols:
resp = requests.get(f"{BASE_URL}/symbols")
resp
SYMBOLS = resp.json()
SYMBOLS
In total, we'll check the following number of prices:
len(EXCHANGES) * len(SYMBOLS) * len(DATES)
Let's first write the function:
def check_price(exchange, symbol, date, base_url=BASE_URL):
resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
return resp.json()
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
exchange, symbol, date
check_price(exchange, symbol, date)
We'll now create our queue:
tasks = Queue()
And we'll initialize it with all the "tasks" to finish:
for exchange in EXCHANGES:
for date in DATES:
for symbol in SYMBOLS:
task = {
'exchange': exchange,
'symbol': symbol,
'date': date,
}
tasks.put(task)
tasks.qsize()
This is the task dictionary that will be consumed by our workers:
task
We'll create a specialized class to store the results:
class PriceResults:
def __init__(self):
results = {}
for exchange in EXCHANGES:
results[exchange] = {}
for date in DATES:
results[exchange][date] = {}
for symbol in SYMBOLS:
results[exchange][date][symbol] = None
self._results = results
def put_price(self, price, exchange, symbol, date):
self._results[exchange][date][symbol] = price
def get_price(self, exchange, symbol, date):
return self._results[exchange][date][symbol]
Warning! We must be sure to use a thread safe collection if multiple threads are writing at the same time. In this case, we don't have duplicated tasks, which means that only 1 thread will write at a given particular spot. If that wasn't the case, we could also use a thread-safe queue to store the results.
Now, let's define the worker function that will consume the queue and check the price:
def worker(task_queue, results):
while True:
try:
task = task_queue.get(block=False)
except queue.Empty:
print('Queue is empty! My work here is done. Exiting.')
return
exchange, symbol, date = task['exchange'], task['symbol'], task['date']
price = check_price(exchange, symbol, date)
results.put_price(price, exchange, symbol, date)
task_queue.task_done()
Now it's time to initialize our workers. How many is the limit? It's very hard to know upfront which will be the limit of the current system in terms of performance. The concurrent.futures
package uses by default the following formula: min(32, os.cpu_count() + 4)
. So that's AT LEAST 32 threads. We can use that number to try things out, but in this point is when profiling is necessary.
results = PriceResults()
MAX_WORKERS = 32
threads = [Thread(target=worker, args=(tasks, results)) for _ in range(MAX_WORKERS)]
And now we're ready! We can start the threads and wait for the queue to empty:
[t.start() for t in threads];
tasks.join()
tasks.qsize()
any([t.is_alive() for t in threads])
And that's it! Our workers have processed all the tasks available. Let's check a few samples:
for _ in range(5):
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
price = results.get_price(exchange, symbol, date)
if price:
print(f"{exchange.title():<20} price of {symbol.upper():^5} on {date:^10} was: ${round(price['close'], 4):>9}")
else:
print(f"No price of {symbol.upper()} for {exchange.title()} on {date}")