# Concurrency-10-Queues_prod_consumer

Last updated: February 17th, 2021
In [1]:
import time
import random
from datetime import datetime, timedelta

import requests


## The Producer / Consumer model¶

In our Real Life example from our one of our previous lessons, we worked with a simple server that would return price information from cryptocurrencies. We've built that server into a local program to use as testing. It should already be installed, and running. If it's not, try running the following commands in your terminal:

$pip install crypto-database$ crypto_db


we can check what exchanges or cryptos are available with a few commands:

In [4]:
!curl http://localhost:8080/exchanges

["bitfinex", "bitstamp", "bittrex", "cexio", "coinbase-pro", "hitbtc", "huobi", "kraken", "mexbt", "okcoin", "okex", "poloniex"]
In [5]:
!curl http://localhost:8080/symbols

["btc", "eth", "ltc"]
In [7]:
!curl http://localhost:8080/price/bitstamp/btc/2020-09-01

{"exchange": "bitstamp", "symbol": "btc", "open": 11713.11, "high": 11780, "low": 11573.39, "close": 11655, "volume": 4534.21810201, "day": "2020-09-01"}

There are around 12 exchanges, 3 symbols and approximately 2 years of data. Suppose we wanted to consult prices for 10 exchanges, for all 3 symbols, for an entire month. That'd give us a total of:

10 exchanges, 3 symbols, for a total of 30 days.

In [8]:
10 * 3 * 30

Out[8]:
900

A total of 900 requests... We said we couldn't just start 900 threads at the same time. The solution is to use the "producer/consumer" model, in which some threads produce tasks to do and put them in a shared collection, and other threads "consume" the tasks from said collection and do the work.

We'll create a Pool of threads, let's say 10, which will be constantly consuming the pending tasks and consulting the prices:

We'll use the python Queue class, from the queue module, as our shared collection; a thread safe FIFO queue. The queue module has multiple queues available (LIFO, priority, bounded), but we'll just use SimpleQueue, which is similar to Queue without the max element boundary.

Here are the basic methods of a Queue:

In [9]:
import queue
from queue import Queue

In [10]:
q = Queue()

In [11]:
q.empty()

Out[11]:
True
In [12]:
q.put('A')

In [13]:
q.put('B')

In [14]:
q.put('C')

In [15]:
q.empty()

Out[15]:
False
In [16]:
q.qsize()

Out[16]:
3
In [17]:
q.get()

Out[17]:
'A'
In [18]:
q.get()

Out[18]:
'B'
In [19]:
q.get()

Out[19]:
'C'
In [20]:
q.empty()

Out[20]:
True

Queues are specially designed to work with multithreaded applications in a producer/consumer model, if we try to get from the queue now that is empty, it'll block waiting for more "work" to be added (we'll have to interrupt it):

In [21]:
q.get()

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
----> 1 q.get()

/usr/local/lib/python3.8/queue.py in get(self, block, timeout)
168             elif timeout is None:
169                 while not self._qsize():
--> 170                     self.not_empty.wait()
171             elif timeout < 0:
172                 raise ValueError("'timeout' must be a non-negative number")

300         try:    # restore state no matter what (e.g., KeyboardInterrupt)
301             if timeout is None:
--> 302                 waiter.acquire()
303                 gotit = True
304             else:

KeyboardInterrupt: 

The Queue.get method has a similar interface as the Lock one. It can return immediately raising a queue.Empty exception:

In [22]:
q.get(block=False)

---------------------------------------------------------------------------
Empty                                     Traceback (most recent call last)
<ipython-input-22-16dd5ea810c3> in <module>
----> 1 q.get(block=False)

/usr/local/lib/python3.8/queue.py in get(self, block, timeout)
165             if not block:
166                 if not self._qsize():
--> 167                     raise Empty
168             elif timeout is None:
169                 while not self._qsize():

Empty: 

Or a timeout (also raising an exception):

In [23]:
q.get(timeout=1)

---------------------------------------------------------------------------
Empty                                     Traceback (most recent call last)
<ipython-input-23-41e2ebc372a4> in <module>
----> 1 q.get(timeout=1)

/usr/local/lib/python3.8/queue.py in get(self, block, timeout)
176                     remaining = endtime - time()
177                     if remaining <= 0.0:
--> 178                         raise Empty
179                     self.not_empty.wait(remaining)
180             item = self._get()

Empty: 

Queues can also be used to "limit" the concurrency level of your program. You can set an upper limit of how many max elements can be placed in the queue. When the limit is reached, the put operation will block. The queue is "full":

In [24]:
q = Queue(maxsize=1)

In [25]:
q.put('A')

In [26]:
q.qsize()

Out[26]:
1

This will block:

In [27]:
q.put('B')

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-27-e67db6c86bf5> in <module>
----> 1 q.put('B')

/usr/local/lib/python3.8/queue.py in put(self, item, block, timeout)
137                 elif timeout is None:
138                     while self._qsize() >= self.maxsize:
--> 139                         self.not_full.wait()
140                 elif timeout < 0:
141                     raise ValueError("'timeout' must be a non-negative number")

300         try:    # restore state no matter what (e.g., KeyboardInterrupt)
301             if timeout is None:
--> 302                 waiter.acquire()
303                 gotit = True
304             else:

KeyboardInterrupt: 

Similarly to get, the put method accepts block and timeout parameters:

In [28]:
q.put('B', block=False)

---------------------------------------------------------------------------
Full                                      Traceback (most recent call last)
<ipython-input-28-e2ce1ab1ff43> in <module>
----> 1 q.put('B', block=False)

/usr/local/lib/python3.8/queue.py in put(self, item, block, timeout)
134                 if not block:
135                     if self._qsize() >= self.maxsize:
--> 136                         raise Full
137                 elif timeout is None:
138                     while self._qsize() >= self.maxsize:

Full: 
In [29]:
q.put('B', timeout=1)

---------------------------------------------------------------------------
Full                                      Traceback (most recent call last)
<ipython-input-29-76a16c193f14> in <module>
----> 1 q.put('B', timeout=1)

/usr/local/lib/python3.8/queue.py in put(self, item, block, timeout)
145                         remaining = endtime - time()
146                         if remaining <= 0.0:
--> 147                             raise Full
148                         self.not_full.wait(remaining)
149             self._put(item)

Full: 

#### Tracking work done¶

Queues additionally include a useful method task_done() that is used to track how many tasks have been completed. In pseudocode, the process is usually:

def worker(q):
try:
except queue.Empty:
print("All work done. Exiting")
return


#### A Real example¶

We'll now use our knowledge of queues to check multiple prices from our crypto server using threads. We'll start first with the list of exchanges we want to use:

In [30]:
BASE_URL = "http://localhost:8080"

In [31]:
resp = requests.get(f"{BASE_URL}/exchanges")

In [32]:
resp

Out[32]:
<Response [200]>
In [33]:
EXCHANGES = resp.json()
EXCHANGES

Out[33]:
['bitfinex',
'bitstamp',
'bittrex',
'cexio',
'coinbase-pro',
'hitbtc',
'huobi',
'kraken',
'mexbt',
'okcoin',
'okex',
'poloniex']

We'll use all the exchanges available in the server. We'll ask for 31 days, from March 1st to March 31st:

In [34]:
START_DATE = datetime(2020, 3, 1)

In [35]:
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]

In [36]:
DATES

Out[36]:
['2020-03-01',
'2020-03-02',
'2020-03-03',
'2020-03-04',
'2020-03-05',
'2020-03-06',
'2020-03-07',
'2020-03-08',
'2020-03-09',
'2020-03-10',
'2020-03-11',
'2020-03-12',
'2020-03-13',
'2020-03-14',
'2020-03-15',
'2020-03-16',
'2020-03-17',
'2020-03-18',
'2020-03-19',
'2020-03-20',
'2020-03-21',
'2020-03-22',
'2020-03-23',
'2020-03-24',
'2020-03-25',
'2020-03-26',
'2020-03-27',
'2020-03-28',
'2020-03-29',
'2020-03-30',
'2020-03-31']

And for all available symbols:

In [37]:
resp = requests.get(f"{BASE_URL}/symbols")

In [38]:
resp

Out[38]:
<Response [200]>
In [39]:
SYMBOLS = resp.json()
SYMBOLS

Out[39]:
['btc', 'eth', 'ltc']

In total, we'll check the following number of prices:

In [40]:
len(EXCHANGES) * len(SYMBOLS) * len(DATES)

Out[40]:
1116

Let's first write the function:

In [41]:
def check_price(exchange, symbol, date, base_url=BASE_URL):
resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
return resp.json()

In [43]:
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
exchange, symbol, date

Out[43]:
('kraken', 'btc', '2020-03-09')
In [44]:
check_price(exchange, symbol, date)

Out[44]:
{'exchange': 'kraken',
'symbol': 'btc',
'open': 8854.6,
'high': 8854.6,
'low': 7850,
'close': 8025,
'volume': 39.62793601,
'day': '2020-03-09'}

We'll now create our queue:

In [45]:
tasks = Queue()


And we'll initialize it with all the "tasks" to finish:

In [46]:
for exchange in EXCHANGES:
for date in DATES:
for symbol in SYMBOLS:
'exchange': exchange,
'symbol': symbol,
'date': date,
}

In [47]:
tasks.qsize()

Out[47]:
1116

This is the task dictionary that will be consumed by our workers:

In [48]:
task

Out[48]:
{'exchange': 'poloniex', 'symbol': 'ltc', 'date': '2020-03-31'}

We'll create a specialized class to store the results:

In [49]:
class PriceResults:
def __init__(self):
results = {}
for exchange in EXCHANGES:
results[exchange] = {}
for date in DATES:
results[exchange][date] = {}
for symbol in SYMBOLS:
results[exchange][date][symbol] = None
self._results = results

def put_price(self, price, exchange, symbol, date):
self._results[exchange][date][symbol] = price

def get_price(self, exchange, symbol, date):
return self._results[exchange][date][symbol]


Warning! We must be sure to use a thread safe collection if multiple threads are writing at the same time. In this case, we don't have duplicated tasks, which means that only 1 thread will write at a given particular spot. If that wasn't the case, we could also use a thread-safe queue to store the results.

Now, let's define the worker function that will consume the queue and check the price:

In [50]:
def worker(task_queue, results):
while True:
try:
except queue.Empty:
print('Queue is empty! My work here is done. Exiting.')
return
price = check_price(exchange, symbol, date)
results.put_price(price, exchange, symbol, date)


Now it's time to initialize our workers. How many is the limit? It's very hard to know upfront which will be the limit of the current system in terms of performance. The concurrent.futures package uses by default the following formula: min(32, os.cpu_count() + 4). So that's AT LEAST 32 threads. We can use that number to try things out, but in this point is when profiling is necessary.

In [51]:
results = PriceResults()

In [52]:
MAX_WORKERS = 32

In [53]:
threads = [Thread(target=worker, args=(tasks, results)) for _ in range(MAX_WORKERS)]


And now we're ready! We can start the threads and wait for the queue to empty:

In [54]:
[t.start() for t in threads];

In [55]:
tasks.join()

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.

In [56]:
tasks.qsize()

Out[56]:
0
In [57]:
any([t.is_alive() for t in threads])

Out[57]:
False

And that's it! Our workers have processed all the tasks available. Let's check a few samples:

In [58]:
for _ in range(5):
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
price = results.get_price(exchange, symbol, date)
if price:
print(f"{exchange.title():<20} price of {symbol.upper():^5} on {date:^10} was: ${round(price['close'], 4):>9}") else: print(f"No price of {symbol.upper()} for {exchange.title()} on {date}")  Bittrex price of ETH on 2020-03-11 was:$   200.45
Bittrex              price of  ETH  on 2020-03-17 was: $111.196 Coinbase-Pro price of BTC on 2020-03-23 was:$  5818.25
No price of LTC for Hitbtc on 2020-03-16
Bittrex              price of  ETH  on 2020-03-04 was: \$  223.702