import queue
import multiprocessing as mp
import concurrent.futures as cf
from queue import Queue, SimpleQueue
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime, timedelta
import requests
concurrent.futures
¶
This lesson has a strange name. concurrent.futures
is the name of a (relative) modern package in the Python standard library. It's a package with a beautiful and Pythonic API that abstracts us from the low level mechanisms of concurrency.
concurrent.futures
should be your default choice for concurrent programming as much as possible
In this tutorial, we started from the low levels threading
and multiprocessing
because we wanted to explain the concepts behind concurrency, but concurrent.futures
offers a much safer and intuitive API. Let's start with it.
In this lecture, we'll keep using our crypto_db
server, so make sure you use the terminal to start it up:
$ crypto_db
Executors and futures¶
Executors¶
Executors are the entry points of cf
. They are similar to multiprocessing.Pool
s. Once an executor has been instantiated, we can submit
jobs, or even map
tasks, similar to multiprocessin.Pool.map
. concurrent.futures.Executor
is an abstract class. cf
includes two concrete classes: ThreadPoolExecutor
and ProcessPoolExecutor
. This means that we can keep the same interface, but use completely different mechanisms just by changing the executor type we're using:
def check_price(exchange, symbol, date):
base_url = "http://localhost:8080"
resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
return resp.json()
with ThreadPoolExecutor(max_workers=10) as ex:
future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
print(f"Price: ${future.result()['close']}")
with ProcessPoolExecutor(max_workers=10, mp_context=mp.get_context('fork')) as ex:
future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
print(f"Price: ${future.result()['close']}")
This is the beauty of cf
: we're using the same logic with two completely different executors; the API is the same.
Futures¶
As you can see from the the examples above, the submit
method returns immediately a Future
object. These objects are an abstraction of a task that is being processed. They have multiple useful methods that we can use (as seen in the following example). The most important one, result(timeout=None)
will block for timeout
seconds until a result was produced:
with ThreadPoolExecutor(max_workers=10) as ex:
future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
print(future.done())
print(f"Price: ${future.result()['close']}")
print(future.done())
The map
method¶
Executors have a map
method that is similar to mp.Pool.map
, it's convenient as there are no futures to work with, but it's limited as only one parameter can be passed:
EXCHANGES = ['bitfinex', 'bitstamp', 'kraken']
def check_price_tuple(arg):
exchange, symbol, date = arg
base_url = "http://localhost:8080"
resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
return resp.json()
with ThreadPoolExecutor(max_workers=10) as ex:
results = ex.map(check_price_tuple, [
(exchange, 'btc', '2020-04-01')
for exchange in EXCHANGES
])
print([price['close'] for price in results])
('bitstamp', 'btc', '2020-04-01')
As you can see, we had to define a new special function that works by receiving a tuple instead of the individual elements.
submit
& as_completed
pattern¶
To overcome the limitation of Executor.map
, we can use a common pattern of creating multiple futures with Executor.submit
and waiting for them to complete with the module-level function concurrent.futures.as_completed
:
with ThreadPoolExecutor(max_workers=10) as ex:
futures = {
ex.submit(check_price, exchange, 'btc', '2020-04-01'): exchange
for exchange in EXCHANGES
}
for future in cf.as_completed(futures):
exchange = futures[future]
print(f"{exchange.title()}: ${future.result()['close']}")
Producer/Consumer with concurrent.futures
¶
I'll show you an example of the producer/consumer pattern using the cf
module. There are multiple ways to create this pattern, I'll stick to the basics.
BASE_URL = "http://localhost:8080"
resp = requests.get(f"{BASE_URL}/exchanges")
EXCHANGES = resp.json()
EXCHANGES[:3]
START_DATE = datetime(2020, 3, 1)
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]
DATES[:3]
resp = requests.get(f"{BASE_URL}/symbols")
SYMBOLS = resp.json()
SYMBOLS
Queues:
work_to_do = Queue()
work_done = SimpleQueue()
for exchange in EXCHANGES:
for date in DATES:
for symbol in SYMBOLS:
task = {
'exchange': exchange,
'symbol': symbol,
'date': date,
}
work_to_do.put(task)
work_to_do.qsize()
def worker(task_queue, results_queue):
while True:
try:
task = task_queue.get(block=False)
except queue.Empty:
print('Queue is empty! My work here is done. Exiting.')
return
exchange, symbol, date = task['exchange'], task['symbol'], task['date']
price = check_price(exchange, symbol, date)
results_queue.put((price, exchange, symbol, date))
task_queue.task_done()
with ThreadPoolExecutor(max_workers=32) as ex:
futures = [
ex.submit(worker, work_to_do, work_done) for _ in range(32)
]
work_to_do.join()
all([f.done() for f in futures])
work_done.qsize()
results = {}
while True:
try:
price, exchange, symbol, date = work_done.get(block=None)
results.setdefault(exchange, {})
results[exchange].setdefault(date, {})
results[exchange][date][symbol] = price['close'] if price else None
except queue.Empty:
break
results['bitfinex']['2020-03-10']['btc']
results['bitstamp']['2020-03-10']['btc']
results['coinbase-pro']['2020-03-10']['btc']
Summary¶
The concurrent.futures
module is the most abstract, highest level concurrency module in the Python standard library and it SHOULD be your default option when writing concurrent code. Only if you need more advanced capabilities, you should use the threading
or multiprocessing
modules directly.