Concurrency-10-Queues_prod_consumer

Last updated: January 15th, 20212021-01-15Project preview
In [32]:
import time
import threading
import random
from threading import Thread
from datetime import datetime, timedelta

import requests

The Producer / Consumer model

In our Real Life example from our one of our previous lessons, we worked with a simple server that would return price information from cryptocurrencies. We've built that server into a local program to use as testing. It should already be installed, and running. If it's not, try running the following commands in your terminal:

$ pip install crypto-database
$ crypto_db

we can check what exchanges or cryptos are available with a few commands:

In [1]:
!curl http://localhost:8080/exchanges
["bitfinex", "bitstamp", "bittrex", "cexio", "coinbase-pro", "hitbtc", "huobi", "kraken", "mexbt", "okcoin", "okex", "poloniex"]
In [3]:
!curl http://localhost:8080/symbols
["btc", "eth", "ltc"]

There are around 12 exchanges, 3 symbols and approximately 2 years of data. Suppose we wanted to consult prices for 10 exchanges, for all 3 symbols, for an entire month. That'd give us a total of:

10 exchanges, 3 symbols, for a total of 30 days.

In [2]:
10 * 3 * 30
Out[2]:
900

A total of 900 requests... We said we couldn't just start 900 threads at the same time. The solution is to use the "producer/consumer" model, in which some threads produce tasks to do and put them in a shared collection, and other threads "consume" the tasks from said collection and do the work.

We'll create a Pool of threads, let's say 10, which will be constantly consuming the pending tasks and consulting the prices:

We'll use the python Queue class, from the queue module, as our shared collection; a thread safe FIFO queue. The queue module has multiple queues available (LIFO, priority, bounded), but we'll just use SimpleQueue, which is similar to Queue without the max element boundary.

Here are the basic methods of a Queue:

In [4]:
import queue
from queue import Queue
In [5]:
q = Queue()
In [6]:
q.empty()
Out[6]:
True
In [7]:
q.put('A')
In [8]:
q.put('B')
In [9]:
q.put('C')
In [10]:
q.empty()
Out[10]:
False
In [11]:
q.qsize()
Out[11]:
3
In [12]:
q.get()
Out[12]:
'A'
In [13]:
q.get()
Out[13]:
'B'
In [14]:
q.get()
Out[14]:
'C'
In [15]:
q.empty()
Out[15]:
True

Queues are specially designed to work with multithreaded applications in a producer/consumer model, if we try to get from the queue now that is empty, it'll block waiting for more "work" to be added (we'll have to interrupt it):

In [16]:
q.get()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-16-4afd527a82ad> in <module>
----> 1 q.get()

~/.pyenv/versions/3.8.0/lib/python3.8/queue.py in get(self, block, timeout)
    168             elif timeout is None:
    169                 while not self._qsize():
--> 170                     self.not_empty.wait()
    171             elif timeout < 0:
    172                 raise ValueError("'timeout' must be a non-negative number")

~/.pyenv/versions/3.8.0/lib/python3.8/threading.py in wait(self, timeout)
    300         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    301             if timeout is None:
--> 302                 waiter.acquire()
    303                 gotit = True
    304             else:

KeyboardInterrupt: 

The Queue.get method has a similar interface as the Lock one. It can return immediately raising a queue.Empty exception:

In [17]:
q.get(block=False)
---------------------------------------------------------------------------
Empty                                     Traceback (most recent call last)
<ipython-input-17-16dd5ea810c3> in <module>
----> 1 q.get(block=False)

~/.pyenv/versions/3.8.0/lib/python3.8/queue.py in get(self, block, timeout)
    165             if not block:
    166                 if not self._qsize():
--> 167                     raise Empty
    168             elif timeout is None:
    169                 while not self._qsize():

Empty: 

Or a timeout (also raising an exception):

In [18]:
q.get(timeout=1)
---------------------------------------------------------------------------
Empty                                     Traceback (most recent call last)
<ipython-input-18-41e2ebc372a4> in <module>
----> 1 q.get(timeout=1)

~/.pyenv/versions/3.8.0/lib/python3.8/queue.py in get(self, block, timeout)
    176                     remaining = endtime - time()
    177                     if remaining <= 0.0:
--> 178                         raise Empty
    179                     self.not_empty.wait(remaining)
    180             item = self._get()

Empty: 

Queues can also be used to "limit" the concurrency level of your program. You can set an upper limit of how many max elements can be placed in the queue. When the limit is reached, the put operation will block. The queue is "full":

In [19]:
q = Queue(maxsize=1)
In [20]:
q.put('A')
In [21]:
q.qsize()
Out[21]:
1

This will block:

In [22]:
q.put('B')
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-22-e67db6c86bf5> in <module>
----> 1 q.put('B')

~/.pyenv/versions/3.8.0/lib/python3.8/queue.py in put(self, item, block, timeout)
    137                 elif timeout is None:
    138                     while self._qsize() >= self.maxsize:
--> 139                         self.not_full.wait()
    140                 elif timeout < 0:
    141                     raise ValueError("'timeout' must be a non-negative number")

~/.pyenv/versions/3.8.0/lib/python3.8/threading.py in wait(self, timeout)
    300         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    301             if timeout is None:
--> 302                 waiter.acquire()
    303                 gotit = True
    304             else:

KeyboardInterrupt: 

Similarly to get, the put method accepts block and timeout parameters:

In [23]:
q.put('B', block=False)
---------------------------------------------------------------------------
Full                                      Traceback (most recent call last)
<ipython-input-23-e2ce1ab1ff43> in <module>
----> 1 q.put('B', block=False)

~/.pyenv/versions/3.8.0/lib/python3.8/queue.py in put(self, item, block, timeout)
    134                 if not block:
    135                     if self._qsize() >= self.maxsize:
--> 136                         raise Full
    137                 elif timeout is None:
    138                     while self._qsize() >= self.maxsize:

Full: 
In [24]:
q.put('B', timeout=1)
---------------------------------------------------------------------------
Full                                      Traceback (most recent call last)
<ipython-input-24-76a16c193f14> in <module>
----> 1 q.put('B', timeout=1)

~/.pyenv/versions/3.8.0/lib/python3.8/queue.py in put(self, item, block, timeout)
    145                         remaining = endtime - time()
    146                         if remaining <= 0.0:
--> 147                             raise Full
    148                         self.not_full.wait(remaining)
    149             self._put(item)

Full: 

Tracking work done

Queues additionally include a useful method task_done() that is used to track how many tasks have been completed. In pseudocode, the process is usually:

def worker(q):
    try:
        task = q.get(block=False)
    except queue.Empty:
        print("All work done. Exiting")
        return
    do_work(task)
    q.task_done()  # Notify the task was successfully finished

A Real example

We'll now use our knowledge of queues to check multiple prices from our crypto server using threads. We'll start first with the list of exchanges we want to use:

In [33]:
BASE_URL = "http://localhost:8080"
In [34]:
resp = requests.get(f"{BASE_URL}/exchanges")
In [35]:
resp
Out[35]:
<Response [200]>
In [36]:
EXCHANGES = resp.json()
EXCHANGES
Out[36]:
['bitfinex',
 'bitstamp',
 'bittrex',
 'cexio',
 'coinbase-pro',
 'hitbtc',
 'huobi',
 'kraken',
 'mexbt',
 'okcoin',
 'okex',
 'poloniex']

We'll use all the exchanges available in the server. We'll ask for 31 days, from March 1st to March 31st:

In [38]:
START_DATE = datetime(2020, 3, 1)
In [39]:
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]
In [40]:
DATES
Out[40]:
['2020-03-01',
 '2020-03-02',
 '2020-03-03',
 '2020-03-04',
 '2020-03-05',
 '2020-03-06',
 '2020-03-07',
 '2020-03-08',
 '2020-03-09',
 '2020-03-10',
 '2020-03-11',
 '2020-03-12',
 '2020-03-13',
 '2020-03-14',
 '2020-03-15',
 '2020-03-16',
 '2020-03-17',
 '2020-03-18',
 '2020-03-19',
 '2020-03-20',
 '2020-03-21',
 '2020-03-22',
 '2020-03-23',
 '2020-03-24',
 '2020-03-25',
 '2020-03-26',
 '2020-03-27',
 '2020-03-28',
 '2020-03-29',
 '2020-03-30',
 '2020-03-31']

And for all available symbols:

In [41]:
resp = requests.get(f"{BASE_URL}/symbols")
In [42]:
resp
Out[42]:
<Response [200]>
In [43]:
SYMBOLS = resp.json()
SYMBOLS
Out[43]:
['btc', 'eth', 'ltc']

In total, we'll check the following number of prices:

In [44]:
len(EXCHANGES) * len(SYMBOLS) * len(DATES)
Out[44]:
1116

Let's first write the function:

In [45]:
def check_price(exchange, symbol, date, base_url=BASE_URL):
    resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
    return resp.json()
In [46]:
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
exchange, symbol, date
Out[46]:
('poloniex', 'eth', '2020-03-07')
In [48]:
check_price(exchange, symbol, date)
Out[48]:
{'exchange': 'poloniex',
 'symbol': 'eth',
 'open': 228.37922989,
 'high': 245.10999999,
 'low': 227.59782195,
 'close': 244.875,
 'volume': 34895.05266286,
 'day': '2020-03-07'}

We'll now create our queue:

In [49]:
tasks = Queue()

And we'll initialize it with all the "tasks" to finish:

In [50]:
for exchange in EXCHANGES:
    for date in DATES:
        for symbol in SYMBOLS:
            task = {
                'exchange': exchange,
                'symbol': symbol,
                'date': date,
            }
            tasks.put(task)
In [51]:
tasks.qsize()
Out[51]:
1116

This is the task dictionary that will be consumed by our workers:

In [52]:
task
Out[52]:
{'exchange': 'poloniex', 'symbol': 'ltc', 'date': '2020-03-31'}

We'll create a specialized class to store the results:

In [53]:
class PriceResults:
    def __init__(self):
        results = {}
        for exchange in EXCHANGES:
            results[exchange] = {}
            for date in DATES:
                results[exchange][date] = {}
                for symbol in SYMBOLS:
                    results[exchange][date][symbol] = None
        self._results = results
        
    def put_price(self, price, exchange, symbol, date):
        self._results[exchange][date][symbol] = price

    def get_price(self, exchange, symbol, date):
        return self._results[exchange][date][symbol]

Warning! We must be sure to use a thread safe collection if multiple threads are writing at the same time. In this case, we don't have duplicated tasks, which means that only 1 thread will write at a given particular spot. If that wasn't the case, we could also use a thread-safe queue to store the results.

Now, let's define the worker function that will consume the queue and check the price:

In [54]:
def worker(task_queue, results):
    while True:
        try:
            task = task_queue.get(block=False)
        except queue.Empty:
            print('Queue is empty! My work here is done. Exiting.')
            return
        exchange, symbol, date = task['exchange'], task['symbol'], task['date']
        price = check_price(exchange, symbol, date)
        results.put_price(price, exchange, symbol, date)
        task_queue.task_done()

Now it's time to initialize our workers. How many is the limit? It's very hard to know upfront which will be the limit of the current system in terms of performance. The concurrent.futures package uses by default the following formula: min(32, os.cpu_count() + 4). So that's AT LEAST 32 threads. We can use that number to try things out, but in this point is when profiling is necessary.

In [55]:
results = PriceResults()
In [56]:
MAX_WORKERS = 32
In [57]:
threads = [Thread(target=worker, args=(tasks, results)) for _ in range(MAX_WORKERS)]

And now we're ready! We can start the threads and wait for the queue to empty:

In [58]:
[t.start() for t in threads];
In [59]:
tasks.join()
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.


Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.


Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.


Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.



Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.


Queue is empty! My work here is done. Exiting.
In [60]:
tasks.qsize()
Out[60]:
0
In [61]:
any([t.is_alive() for t in threads])
Out[61]:
False

And that's it! Our workers have processed all the tasks available. Let's check a few samples:

In [62]:
for _ in range(5):
    exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
    price = results.get_price(exchange, symbol, date)
    if price:
        print(f"{exchange.title():<20} price of {symbol.upper():^5} on {date:^10} was: ${round(price['close'], 4):>9}")
    else:
        print(f"No price of {symbol.upper()} for {exchange.title()} on {date}")
Cexio                price of  LTC  on 2020-03-22 was: $    38.86
Okex                 price of  ETH  on 2020-03-12 was: $   194.59
No price of BTC for Huobi on 2020-03-05
No price of ETH for Mexbt on 2020-03-15
Coinbase-Pro         price of  LTC  on 2020-03-08 was: $    60.57
Notebooks AI
Notebooks AI Profile20060