20. Concurrent Executorsđź”—
- 99% of use cases application programmer is likely to run into, the following pattern
- chapter focuses on the
concurrent.futures.Executor
classes that encapsulate the pattern of spawning a bunch of independent threads and collecting results in a queue - mostly systems programmer utilize other features of python quite heavily
Concurrent Web Downloadsđź”—
- concurrency is essential for efficient network I/O: instead of waiting of response application should work on something else.
- Following codes fetches 20 country flags from web. There are 4 version of it, Sequential being slowest wihle other concurrent implementation being faster.
A Sequential Download Scriptđź”—
import time
from pathlib import Path
from typing import Callable
import httpx # not part of standard library/actually its convention to leave one blank line
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split() # List of ISO 3166 country codes
BASE_URL = 'https://www.fluentpython.com/data/flags' # dir with flag img
DEST_DIR = Path('downloaded') # local dir
def save_flag(img: bytes, filename: str) -> None: # save img, bytes to file
(DEST_DIR / filename).write_bytes(img)
def get_flag(cc: str) -> bytes: # return binary contents
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = httpx.get(url, timeout=6.1, # for network calls always add timeout
follow_redirects=True) # by default doesn't follow redirectly
resp.raise_for_status() # there is no error handling but this method raises exception
return resp.content
def download_many(cc_list: list[str]) -> int: # this will be used for comparisons
for cc in sorted(cc_list):
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True) # display one country code at a time
return len(cc_list)
def main(downloader: Callable[[list[str]], int]) -> None: # main called with downloading fn
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter()
count = downloader(POP20_CC)
elapsed = time.perf_counter() - t0
print(f'\n{count} downloads in {elapsed:.2f}s')
if __name__ == '__main__':
main(download_many) # call many with download function
Downloading with concurrent.futuresđź”—
- Main feature of
concurrent.futures
packages areThreadPoolExecutor
andProcessPoolExecutor
classes which implement an API to submit callables for execution in different threads or processes, respectively.
from concurrent import futures
from flags import save_flag, get_flag, main # reuse old functions
def download_one(cc: str): # this is what each worker will execute
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc
def download_many(cc_list: list[str]) -> int:
with futures.ThreadPoolExecutor() as executor: # context manage ThreadPoolExecutor
res = executor.map(download_one, sorted(cc_list)) # map method is similar to built-in map, returns a generator that we can iterate to retrieve the value returned by each function call.
return len(list(res))
# Return the number of results obtained. If any of the threaded calls raises an exception, that exception is raised here when the implicit next() call inside the list constructor tries to retrieve the corresponding return value from the iterator returned by executor.map.
if __name__ == '__main__':
main(download_many) # return number of results obtained.
- the computed default for
max_workers
is sensible, andThreadPoolExecutor
avoids starting new workers unnecessarily. Understanding the logic behindmax_workers
may help you decide when and how to set it yourself.
Where are the Futures ?đź”—
- there are two classed name
Future
in standard library :concurrent.futures.Future
andasyncio.Future
. They server same purpose : an instance of eitherFuture
class represents a deferred computaion that may or may not have completed. - Futures encapsulate pending operation so that we can put them in queues, check whether they are done and retrieve results (exception) when they become available.
- We should not create futures: they are meant to created by concurrency framework. a
Future
represent something that will run eventually, therefore it must be schedules to run, and that the job of framework. - Application code is not supposed to change the state of a future: the concurrency framework changes the state of a future when the computation it represents is done, and we can’t control when that happens.
- Both types of
Future
have a.done()
method that is nonblocking and returns a Boolean that tells you whether the callable wrapped by that future has executed or not. However, instead of repeatedly asking whether a future is done, client code usually asks to be notified. That’s why bothFuture
classes have an.add_done_callback()
method: you give it a callable, and the callable will be invoked with the future as the single argument when the future is done. Be aware that the callback callable will run in the same worker thread or process that ran the function wrapped in the future. - There is also a
.result()
method, which works the same in both classes when the future is done: it returns the result of the callable, or re-raises whatever exception might have been thrown when the callable was executed. - However, when the future is not done, the behavior of the
result
method is very different between the two flavors ofFuture
. In aconcurrency.futures.Future
instance, invokingf.result()
will block the caller’s thread until the result is ready. An optionaltimeout
argument can be passed, and if the future is not done in the specified time, theresult
method raisesTimeoutError
. Theasyncio.Future.result
method does not support timeout, andawait
is the preferred way to get the result of futures inasyncio
—butawait
doesn’t work withconcurrency.futures.Future
instances.
def download_many(cc_list: list[str]) -> int:
cc_list = cc_list[:5] # take only 5 entries
with futures.ThreadPoolExecutor(max_workers=3) as executor: # max worker to 3 so see future pending in the otuput
to_do: list[futures.Future] = []
for cc in sorted(cc_list): # call in order, result will not be in order
future = executor.submit(download_one, cc) # submit the callable
to_do.append(future) # add that future to list
print(f'Scheduled for {cc}: {future}')
for count, future in enumerate(futures.as_completed(to_do), 1): # as_completed yields futures as they are completed
res: str = future.result()
print(f'{future} result: {res!r}')
return count
Launching Processes with concurrent.futuresđź”—
concurrent.futures
supportes parallel computation on multicore machines because it supports distributing work among multiple python processes usingProcessPoolExecutor
.- In our program there is no advantage of a process pool executor or any I/O bound job. We will get same performance.
- Its useful for CPU-Intensive jobs
Multicore Prime Checker Reduxđź”—
import sys
from concurrent import futures
from time import perf_counter
from typing import NamedTuple
from primes import is_prime, NUMBERS
class PrimeResult(NamedTuple):
n: int
flag: bool
elapsed: float
def check(n: int) -> PrimeResult:
t0 = perf_counter()
res = is_prime(n)
return PrimeResult(n, res, perf_counter() - t0)
def main() -> None:
if len(sys.argv) < 2:
workers = None
else:
workers = int(sys.argv[1])
executor = futures.ProcessPoolExecutor(workers)
actual_workers = executor._max_workers # type: ignore # undocumented instance attribute of Process pool executor taken to show max_workers, disable typehints using that comment
print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')
t0 = perf_counter()
numbers = sorted(NUMBERS, reverse=True) # sort the numbers to expose difference in behaviour of this code as compared to previous
with executor:
for n, prime, elapsed in executor.map(check, numbers):
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')
time = perf_counter() - t0
print(f'Total time: {time:.2f}s')
if __name__ == '__main__':
main()
- you’ll see the results appearing in strict descending order. In contrast, the ordering of the output of procs.py (shown in “Process-Based Solution”) is heavily influenced by the difficulty in checking whether each number is a prime.
executor.map(check, numbers)
always returns the results in same order as the numbers are given.
Experimenting with Executor.mapđź”—
from time import sleep, strftime
from concurrent import futures
def display(*args):# simply print whatever given
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)
def loiter(n): # does nothing except disaply message when it starts
msg = '{}loiter({}): doing nothing for {}s...'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done.'
display(msg.format('\t'*n, n))
return n * 10 # return so we can see how to collect results
def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3) # only 3 threads
results = executor.map(loiter, range(5)) # submit 5 taks o executor
display('results:', results) # immidiatedly shows 3 outputs
display('Waiting for individual results:')
for i, result in enumerate(results):
# The enumerate call in the for loop will implicitly invoke next(results), which in turn will invoke _f.result() on the (internal) _f future representing the first call, loiter(0). The result method will block until the future is done, therefore each iteration in this loop will have to wait for the next result to be ready.
display(f'result {i}: {result}')
if __name__ == '__main__':
main()
- The
Executor.map
function is easy to use, but often it’s preferable to get the results as they are ready, regardless of the order they were submitted. To do that, we need a combination of theExecutor.submit
method and thefutures.as_completed
function - The combination of
executor.submit
andfutures.as_completed
is more flexible thanexecutor.map
because you cansubmit
different callables and arguments, whileexecutor.map
is designed to run the same callable on the different arguments. In addition, the set of futures you pass tofutures.as_completed
may come from more than one executor—perhaps some were created by aThreadPoolExecutor
instance, while others are from aProcessPoolExecutor
.
Downloads with Progress Display and Error Handlingđź”—
We will implement version of flags2.py
with animated, text-mode progress bar implemented with tqdm packages.
from collections import Counter
from http import HTTPStatus
import httpx
import tqdm # type: ignore 1
from flags2_common import main, save_flag, DownloadStatus # import already implemented stuff
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
def get_flag(base_url: str, cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = httpx.get(url, timeout=3.1, follow_redirects=True)
resp.raise_for_status() # HTTPStatusError, if HTTP code not in range(200,300)
return resp.content
def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
try:
image = get_flag(base_url, cc)
except httpx.HTTPStatusError as exc: # handle 404 correctly
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND # by setting its local status to DownloadStatus.NOT_FOUND; DownloadStatus is an Enum imported from flags2_common.py.
msg = f'not found: {res.url}'
else:
raise
else:
save_flag(image, f'{cc}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose:
print(cc, msg)
return status
Sequential Implementationđź”—
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
_unused_concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[DownloadStatus] = Counter() # tally different download outcomes
cc_iter = sorted(cc_list) # list of country codes as args
if not verbose:
cc_iter = tqdm.tqdm(cc_iter) # if no in -v mode, cc_iter is passed to tqdm, which returns an iterator yielding the items in cc_iter
for cc in cc_iter:
try:
status = download_one(cc, base_url, verbose) # call to download one
except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt:
break
else:
error_msg = ''
if error_msg:
status = DownloadStatus.ERROR
counter[status] += 1
if verbose and error_msg:
print(f'{cc} error: {error_msg}')
return counter
Futures.as_completedđź”—
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus
from flags2_sequential import download_one
DEFAULT_CONCUR_REQ = 30 # default concurrent req
MAX_CONCUR_REQ = 1000 # max concurrent req
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[DownloadStatus] = Counter()
with ThreadPoolExecutor(max_workers=concur_req) as executor:
to_do_map = {}
for cc in sorted(cc_list):
future = executor.submit(download_one, cc,
base_url, verbose)
to_do_map[future] = cc
done_iter = as_completed(to_do_map)
if not verbose:
done_iter = tqdm.tqdm(done_iter, total=len(cc_list))
for future in done_iter:
try:
status = future.result()
except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt:
break
else:
error_msg = ''
if error_msg:
status = DownloadStatus.ERROR
counter[status] += 1
if verbose and error_msg:
cc = to_do_map[future]
print(f'{cc} error: {error_msg}')
return counter
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)