Concurrency-13-Concurrent-Futures

Last updated: December 27th, 2020
In [1]:
import queue
import multiprocessing as mp
import concurrent.futures as cf

from queue import Queue, SimpleQueue

from datetime import datetime, timedelta

import requests


concurrent.futures¶

This lesson has a strange name. concurrent.futures is the name of a (relative) modern package in the Python standard library. It's a package with a beautiful and Pythonic API that abstracts us from the low level mechanisms of concurrency.

concurrent.futures should be your default choice for concurrent programming as much as possible

In this tutorial, we started from the low levels threading and multiprocessing because we wanted to explain the concepts behind concurrency, but concurrent.futures offers a much safer and intuitive API. Let's start with it.

In this lecture, we'll keep using our crypto_db server, so make sure you use the terminal to start it up:

$crypto_db  Executors and futures¶ Executors¶ Executors are the entry points of cf. They are similar to multiprocessing.Pools. Once an executor has been instantiated, we can submit jobs, or even map tasks, similar to multiprocessin.Pool.map. concurrent.futures.Executor is an abstract class. cf includes two concrete classes: ThreadPoolExecutor and ProcessPoolExecutor. This means that we can keep the same interface, but use completely different mechanisms just by changing the executor type we're using: In [2]: def check_price(exchange, symbol, date): base_url = "http://localhost:8080" resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}") return resp.json()  In [3]: with ThreadPoolExecutor(max_workers=10) as ex: future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01') print(f"Price:${future.result()['close']}")

Price: $6421.14  In [4]: with ProcessPoolExecutor(max_workers=10, mp_context=mp.get_context('fork')) as ex: future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01') print(f"Price:${future.result()['close']}")

Price: $6421.14  This is the beauty of cf: we're using the same logic with two completely different executors; the API is the same. Futures¶ As you can see from the the examples above, the submit method returns immediately a Future object. These objects are an abstraction of a task that is being processed. They have multiple useful methods that we can use (as seen in the following example). The most important one, result(timeout=None) will block for timeout seconds until a result was produced: In [5]: with ThreadPoolExecutor(max_workers=10) as ex: future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01') print(future.done()) print(f"Price:${future.result()['close']}")
print(future.done())

False
Price: $6421.14 True  The map method¶ Executors have a map method that is similar to mp.Pool.map, it's convenient as there are no futures to work with, but it's limited as only one parameter can be passed: In [6]: EXCHANGES = ['bitfinex', 'bitstamp', 'kraken']  In [7]: def check_price_tuple(arg): exchange, symbol, date = arg base_url = "http://localhost:8080" resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}") return resp.json()  In [8]: with ThreadPoolExecutor(max_workers=10) as ex: results = ex.map(check_price_tuple, [ (exchange, 'btc', '2020-04-01') for exchange in EXCHANGES ]) print([price['close'] for price in results])  [6409.8, 6421.14, 6401.9]  In [9]: ('bitstamp', 'btc', '2020-04-01')  Out[9]: ('bitstamp', 'btc', '2020-04-01') As you can see, we had to define a new special function that works by receiving a tuple instead of the individual elements. submit & as_completed pattern¶ To overcome the limitation of Executor.map, we can use a common pattern of creating multiple futures with Executor.submit and waiting for them to complete with the module-level function concurrent.futures.as_completed: In [14]: with ThreadPoolExecutor(max_workers=10) as ex: futures = { ex.submit(check_price, exchange, 'btc', '2020-04-01'): exchange for exchange in EXCHANGES } for future in cf.as_completed(futures): exchange = futures[future] print(f"{exchange.title()}:${future.result()['close']}")

Kraken: $6401.9 Bitfinex:$6409.8
Bitstamp: \$6421.14


Producer/Consumer with concurrent.futures¶

I'll show you an example of the producer/consumer pattern using the cf module. There are multiple ways to create this pattern, I'll stick to the basics.

In [15]:
BASE_URL = "http://localhost:8080"

In [16]:
resp = requests.get(f"{BASE_URL}/exchanges")

In [17]:
EXCHANGES = resp.json()
EXCHANGES[:3]

Out[17]:
['bitfinex', 'bitstamp', 'bittrex']
In [18]:
START_DATE = datetime(2020, 3, 1)

In [19]:
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]

In [20]:
DATES[:3]

Out[20]:
['2020-03-01', '2020-03-02', '2020-03-03']
In [21]:
resp = requests.get(f"{BASE_URL}/symbols")

In [22]:
SYMBOLS = resp.json()
SYMBOLS

Out[22]:
['btc', 'eth', 'ltc']

Queues:

In [23]:
work_to_do = Queue()
work_done = SimpleQueue()

In [24]:
for exchange in EXCHANGES:
for date in DATES:
for symbol in SYMBOLS:
'exchange': exchange,
'symbol': symbol,
'date': date,
}

In [25]:
work_to_do.qsize()

Out[25]:
1023
In [26]:
def worker(task_queue, results_queue):
while True:
try:
except queue.Empty:
print('Queue is empty! My work here is done. Exiting.')
return
price = check_price(exchange, symbol, date)
results_queue.put((price, exchange, symbol, date))

In [27]:
with ThreadPoolExecutor(max_workers=32) as ex:
futures = [
ex.submit(worker, work_to_do, work_done) for _ in range(32)
]
work_to_do.join()

Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.

In [28]:
all([f.done() for f in futures])

Out[28]:
True
In [29]:
work_done.qsize()

Out[29]:
1023
In [30]:
results = {}

In [31]:
while True:
try:
price, exchange, symbol, date = work_done.get(block=None)
results.setdefault(exchange, {})
results[exchange].setdefault(date, {})
results[exchange][date][symbol] = price['close'] if price else None
except queue.Empty:
break

In [32]:
results['bitfinex']['2020-03-10']['btc']

Out[32]:
7941
In [33]:
results['bitstamp']['2020-03-10']['btc']

Out[33]:
7936.25
In [34]:
results['coinbase-pro']['2020-03-10']['btc']

Out[34]:
7934.52

Summary¶

The concurrent.futures module is the most abstract, highest level concurrency module in the Python standard library and it SHOULD be your default option when writing concurrent code. Only if you need more advanced capabilities, you should use the threading or multiprocessing modules directly.