Asynchronous Programming🔗
Three main topics to be addressed,
- Python’s
async def
,await
,async with
andasync for
constructs - Objects supporting those constructs : native coroutines and asynchronous variants of context managers, iterables, generators, and comprehensions.
asyncio
and other asynchronous libraries
A Few Definitions🔗
Python offers three kinds of coroutines
- Native Coroutines
- A coroutine function defined with
async def
and can be delegated from a native coroutines to another native coroutine usingawait
keyword, similar to how classic coroutines useyield from
. - NOTE:
await
can’t be used outsideasync def
function but its not necessary to exist.
- A coroutine function defined with
- Classic Coroutines
- A generator function that consumes data sent to it via
my_coro.send(data)
calls, and reads that data by usingyield
in an expression. Classic coroutines can delegate to other classic coroutines usingyield from
. - Classic coroutines can’t be driven using
await
and are not longer supported by asyncio.
- A generator function that consumes data sent to it via
- Generator Based Coroutines
- A generator function decorated with
@types.coroutine
. That decorator makes the generator function compatible withawait
keyword.
- A generator function decorated with
Ansynchronous generator
- A generator function defined with
async def
and usingyield
in its body. It returns an asynchronous generator object that offers__anext__
with the newawait
keyword.
An Asyncio Example : Probing domains🔗
A search for domains for a Python blog
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]: # returns tuple with domain name and a boolean, True meaning it resolved
loop = asyncio.get_running_loop() # get a reference to asyncio event loop
try:
await loop.getaddrinfo(domain, None) # gets the five part tuple parameter to connect to given address using socket
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None: # main must be a coroutine
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # generator to yield python words
domains = (f'{name}.dev'.lower() for name in names) # append .dev suffix
coros = [probe(domain) for domain in domains] # list of coroutines
for coro in asyncio.as_completed(coros): # as_completed is a generartor that yields coroutines that reutrn the result to the order they are completed in
domain, found = await coro # since this coroutine is completed await doesn't block
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main()) # starts the event loop and returns only when event loop exits
Guido’s Trick to Read Asynchronous Code🔗
- Pretend
async
andawait
keywords are not there, If you do that you will realise that coroutines read like plain old sequential function - here during execute of
probe('if.dev')
coroutine, a new coroutine object is created bygetaddrinfo
. Awaiting it starts the low-level query and yields control back to event loop. The event loop can then drive other pending coroutine object and when there is a response back, that specific coroutine object resumes and returns control back toprobe('if.dev')
which was suspended atawait
and can now handle possible exceptions and return result tuple asyncio.as_completed
andawait
can be applied to anyawaitable
object
New Concept : Awaitable🔗
The for
keyword works with iterables. The await
keyword works with awaitables.
Usually these awaitables are encountered.
- A native coroutine object, which you get by calling a native coroutine function
- An
asyncio.Task
, which you usually get by passing a coroutine object toasyncio.create_task()
However, end-user code does not always need to await
on a Task
. We use asyncio.create_task(one_coro())
to schedule one_coro
for concurrent execution, without waiting for its return. That’s what we did with the spinner
coroutine in spinner_async.py
If you don’t expect to cancel the task or wait for it, there is no need to keep the Task
object returned from create_task
. Creating the task is enough to schedule the coroutine to run.
When implementing asynchronous libraries or contributing to asyncio itself, you may also deal with these lower-level awaitables:
- An object with an
__await__
method that returns an iterator; for example, anasyncio.Future
instance (asyncio.Task
is a subclass ofasyncio.Future
) - Objects written in other languages using the Python/C API with a
tp_as_async.am_await
function, returning an iterator (similar to__await__
method)
Downloading with asyncio and HTTPX🔗
As of Python 3.10, asyncio only supports TCP and UDP directly, and there are no asynchronous HTTP client or server packages in the standard library. we will be using HTTPX in all the HTTP client examples.
flags_asyncio.py : startup functions
def download_many(cc_list: list[str]) -> int: # needs to be plain function not a coroutine, as it passed to and called by main
return asyncio.run(supervisor(cc_list)) # execute the event loop driving the supervisor(cc_list) couroutine object until it returns. This will block while even loop runs
async def supervisor(cc_list: list[str]) -> int:
async with AsyncClient() as client: # asynchronous HTTP client operation in httpx are methods of AsyncClient
to_do = [download_one(client, cc)
for cc in sorted(cc_list)] # list of coroutines
res = await asyncio.gather(*to_do) # wait for .gather couroutines and waits for all of them to complete
return len(res) # return lenght of the list returned by asyncio.gather
if __name__ == '__main__':
main(download_many)
flags_asyncio.py : imports and download functions
import asyncio
from httpx import AsyncClient # must be installed
from flags import BASE_URL, save_flag, main # reuse code from flags.py
async def download_one(client: AsyncClient, cc: str): # download one must be a native coroutine so it can await on get_flag
image = await get_flag(client, cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc
async def get_flag(client: AsyncClient, cc: str) -> bytes: # needs to recieve the AsyncClient instance returning a ClientResponse object that is also an asychronous context manager
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=6.1,
follow_redirects=True) # network I/O operation are run by default by asynchronous context manager
return resp.read()
The Secret of Native Coroutines: Humble Generators🔗
- A key difference b/w classic coroutines and flags_asyncio.py is that there are no visible
.send()
calls oryield
expression in latter. You code sits b/w asyncio library and asynchronous libraries you are using, such as HTTPX. - Under the hood, the
asyncio
event loop makes the.send
calls that drive your coroutines, and your coroutinesawait
on other coroutines, including library coroutines. As mentioned,await
borrows most of its implementation fromyield from
, which also makes.send
calls to drive coroutines. - he
await
chain eventually reaches a low-level awaitable, which returns a generator that the event loop can drive in response to events such as timers or network I/O. The low-level awaitables and generators at the end of theseawait
chains are implemented deep into the libraries, are not part of their APIs, and may be Python/C extensions. - Using functions like
asyncio.gather
andasyncio.create_task
, you can start multiple concurrentawait
channels, enabling concurrent execution of multiple I/O operations driven by a single event loop, in a single thread.
The All-or-Nothing Problem🔗
we could not reuse the get_flag function from flags.py. We had to rewrite it as a coroutine to use the asynchronous API of HTTPX. For peak performance with asyncio, we must replace every function that does I/O with an asynchronous version that is activated with await or asyncio.create_task, so that control is given back to the event loop while the function waits for I/O. If you can’t rewrite a blocking function as a coroutine, you should run it in a separate thread or process
Asynchronous Context Managers🔗
- a context manager implements
__enter__
and__exit__
(for graceful cleanup). - In asynchronous driver like
asyncpg
, the setup and wrap-up need to be coroutines so that other operation can happen concurrently. But classicwith
keyword doesn’t work properly in asynchronous cases that is whyasync with
was introduced implementing__aenter__
and__aexit__
methods as coroutines.
async with connection.transaction():
await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
- Using coroutines to implement
Transaction
as an asynchronous context manager allows asyncpg to handle many transactions concurrently.
Enhancing the asyncio Downloader🔗
- our last implementation of progress displayed and done error handling in
flags2
.
Using asyncio.as_completed and a Thread🔗
- previous example we use
asyncio.gather
which returned list with result of coroutines in order they were submitted which implied it return when all of them completed. - however to update a progress bar we need to know results as they are done
- We will use
asyncio
equivalent ofas_completed
generator function that we used in thread pool example
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus, save_flag
# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(client: httpx.AsyncClient, # very similar to sequential implementation
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True) # .get AsyncClient method and coroutine by default so we await it
resp.raise_for_status()
return resp.content
async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore: # semaphore as async context manager so program as a whole is not blocked only this coroutine is suspended when semaphore is zero
image = await get_flag(client, base_url, cc)
except httpx.HTTPStatusError as exc: # error handling as before
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
await asyncio.to_thread(save_flag, image, f'{cc}.gif') # I/O operation, to avoid blocking event loop, run in a thread
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status
Throttling Requests with a Semaphore🔗
- network client should be throttled to avoid doing DDOS on a server accidently
- A semaphore is a synchronization primitive, more flexible than a lock. It can be held by multiple coroutines, with a configurable max number.
- In threadpool implementation of
flags2.py
we use max_workers as number of throttle. In above example supervisor function creates asyncio.Semaphore which is passed as args to download one. - Computer scientist Edsger W. Dijkstra invented the semaphore in the early 1960s. It’s a simple idea, but it’s so flexible that most other synchronization objects—such as locks and barriers—can be built on top of semaphores. There are three
Semaphore
classes in Python’s standard library: one inthreading
, another inmultiprocessing
, and a third one inasyncio
. - An
asyncio.Semaphore
has an internal counter that is decremented whenever weawait
on the.acquire()
coroutine method, and incremented when we call the.release()
method
- Awaiting on
.acquire()
causes no delay when the counter is greater than zero, but if the counter is zero,.acquire()
suspends the awaiting coroutine until some other coroutine calls.release()
on the sameSemaphore
, thus incrementing the counter. Instead of using those methods directly, it’s safer to use thesemaphore
as an asynchronous context manager
- The
Semaphore.__aenter__
coroutine method awaits for.acquire()
, and its__aexit__
coroutine method calls.release()
. That snippet guarantees that no more thanconcur_req
instances ofget_flags
coroutines will be active at any time. Each of theSemaphore
classes in the standard library has aBoundedSemaphore
subclass that enforces an additional constraint
# rest of the code for above asyncio downloader
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]: # same args as download_many
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # semaphore
async with httpx.AsyncClient() as client:
to_do = [download_one(client, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # create list of coroutine objects
to_do_iter = asyncio.as_completed(to_do) # iterator that return as coroutines are completed
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # tqdm to display progress
error: httpx.HTTPError | None = None # exceptions
for coro in to_do_iter: # iterate over tasks
try:
status = await coro # await for result
except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
error = exc # scope of exc limited here, so assign to error
except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip()
error = exc
except KeyboardInterrupt:
break
if error:
status = DownloadStatus.ERROR # set a status if failed.
if verbose:
url = str(error.request.url)
cc = Path(url).stem.upper()
print(f'{cc} error: {error_msg}')
counter[status] += 1
return counter
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # instantiate supervisor coroutine object and pass it to the event loop with asyncio.run
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
Making Multiple Request for Each Download🔗
- Suppose you want to save each country flag with the name of the country and the country code, instead of just the country code. Now you need to make two HTTP requests per flag: one to get the flag image itself, the other to get the metadata.json file in the same directory as the image—that’s where the name of the country is recorded.
- Coordinating multiple requests in the same task is easy in the threaded script: just make one request then the other, blocking the thread twice, and keeping both pieces of data (country code and name) in local variables, ready to use when saving the files. If you needed to do the same in an asynchronous script with callbacks, you needed nested functions so that the country code and name were available in their closures until you could save the file, because each callback runs in a different local scope. The
await
keyword provides relief from that, allowing you to drive the asynchronous requests one after the other, sharing the local scope of the driving coroutine. - We will implement two more functions :
get_country
anddownload_one
again
# get_country
async def get_country(client: httpx.AsyncClient,
base_url: str,
cc: str) -> str: # string with country name returned
url = f'{base_url}/{cc}/metadata.json'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True)
resp.raise_for_status()
metadata = resp.json() # dict built from json
return metadata['country'] # return country name
# download_one
async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore: # hold semaphore
image = await get_flag(client, base_url, cc)
async with semaphore: # again hold for get_country
country = await get_country(client, base_url, cc)
except httpx.HTTPStatusError as exc:
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
filename = country.replace(' ', '_') # country name to create files
await asyncio.to_thread(save_flag, image, f'{filename}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status
- We put the calls to
get_flag
andget_country
in separatewith
blocks controlled by thesemaphore
because it’s good practice to hold semaphores and locks for the shortest possible time. - We could schedule both
get_flag
andget_country
in parallel usingasyncio.gather
, but ifget_flag
raises an exception, there is no image to save, so it’s pointless to runget_country
. But there are cases where it makes sense to useasyncio.gather
to hit several APIs at the same time instead of waiting for one response before making the next request.
Delegating Tasks to Executors🔗
One important advantage of Node.js over Python for asynchronous programming is the Node.js standard library, which provides async APIs for all I/O—not just for network I/O. In Python, if you’re not careful, file I/O can seriously degrade the performance of asynchronous applications, because reading and writing to storage in the main thread blocks the event loop.
- We used above line to write data in main thread. Instead we can get reference to main event loop and delegate this task to another thread using
asyncio.to_thread
loop = asyncio.get_running_loop() # get eventloop
loop.run_in_executor(None, save_flag, # None defaults to ThreadPoolExecutor
image, f'{cc}.gif') # positional args to function to run.
A common pattern in asynchronous APIs is to wrap blocking calls that are implementation details in coroutines using run_in_executor
internally. That way, you provide a consistent interface of coroutines to be driven with await
, and hide the threads you need to use for pragmatic reasons.
The main reason to pass an explict Executor
to loop.run_in_executor
is to employ a ProcessPoolExecutor
if the function to execute is CPU intensive, so that it runs in a different Python process, avoiding contention for the GIL. Because of the high start-up cost, it would be better to start the ProcessPoolExecutor
in the supervisor
, and pass it to the coroutines that need to use it.
Writing asyncio Server🔗
- We will implement an
echo
server whihc allows unicode serar utilities. First using HTTP with FastAPI, then using plain TCP withasyncio
only
from pathlib import Path
from unicodedata import name
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from charindex import InvertedIndex
STATIC_PATH = Path(__file__).parent.absolute() / 'static' # overload / operator by pathlib
app = FastAPI( # ASGI app
title='Mojifinder Web',
description='Search for Unicode characters by name.',
)
class CharName(BaseModel): # A pydantic schema for JOSN Response with char and name fields
char: str
name: str
def init(app): # build the index and load static HTML form
app.state.index = InvertedIndex()
app.state.form = (STATIC_PATH / 'form.html').read_text()
init(app) # run init
@app.get('/search', response_model=list[CharName])
async def search(q: str): # fast api assumes that any parameter not in the route path will be passed in the HTTP query string
chars = sorted(app.state.index.search(q))
return ({'char': c, 'name': name(c)} for c in chars) # return list of iterables of dicts
@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form(): # regular function can be used to produce responses
return app.state.form
# no main funcion 10
- FastAPI is built on the Starlette ASGI toolkit, which in turn uses
asyncio
.
An asyncio TCP Server🔗
async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
server = await asyncio.start_server( # gets an instance of asyncio.Server (TCP socket server)
functools.partial(finder, index), # client_connected_cb, a callback to run when a client connects, call back can be function or coroutine that accepts two args asyncio.StreamReader and asyncio.StreamWriter, however our finder coroutine also gets an index, so we use partial to bind that parameter and obtain a callable that takes reader and writer.
host, port)
socket_list = cast(tuple[TransportSocket, ...], server.sockets) # cast is needed because typeshed has an outdated type hint for sockets property of Server class
addr = socket_list[0].getsockname()
print(f'Serving on {addr}. Hit CTRL-C to stop.')
await server.serve_forever() # server forever
def main(host: str = '127.0.0.1', port_arg: str = '2323'):
port = int(port_arg)
print('Building index.')
index = InvertedIndex()
try:
asyncio.run(supervisor(index, host, port)) # start event loop supervisor
except KeyboardInterrupt: # interrupt to shut down
print('\nServer shut down.')
if __name__ == '__main__':
main(*sys.argv[1:])
# rest of the implementation
import asyncio
import functools
import sys
from asyncio.trsock import TransportSocket
from typing import cast
from charindex import InvertedIndex, format_results
CRLF = b'\r\n'
PROMPT = b'?> '
async def finder(index: InvertedIndex,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
client = writer.get_extra_info('peername') # remote client name
while True: 4
writer.write(PROMPT) # can't await! this line sends ?> prompt
await writer.drain() # must await! # its a coroutine so drive using await
data = await reader.readline() # coroutine that returns bytes
if not data:
break
try:
query = data.decode().strip()
except UnicodeDecodeError: # can happend when user hits Ctrl-C and telnet client sends control bytes. replace query with null character
query = '\x00'
print(f' From {client}: {query!r}') # log
if query:
if ord(query[:1]) < 32:
break
results = await search(query, index, writer)
print(f' To {client}: {results} results.')
writer.close() # Close stream
await writer.wait_closed() # wait for it to close
print(f'Close {client}.') # log end of client's session to server console
# search
async def search(query: str, # must be coroutine
index: InvertedIndex,
writer: asyncio.StreamWriter) -> int:
chars = index.search(query) # query inverted index
lines = (line.encode() + CRLF for line # generator expression yields byte string encoded in UTF-8 with actual unicode code point
in format_results(chars))
writer.writelines(lines) # send the lines
await writer.drain() # drain
status_line = f'{"─" * 66} {len(chars)} found' # build status and send
writer.write(status_line.encode() + CRLF)
await writer.drain()
return len(chars)
Asynchronous Iteration and Asynchronous Iterables🔗
Async Beyond asyncio: Curio🔗
Python’s async/await
language constructs are not tied to any specific event loop or library. Thanks to the extensible API provided by special methods anyone can write their own asynchronous runtime environment and framework to drive native coroutines, asynchronous generators, etc.
That’s what David Beazily did in his Curio
project. Curio has a cleaner API and a simpler implementation, compared to asyncio
#!/usr/bin/env python3
from curio import run, TaskGroup
import curio.socket as socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]: # no need to get event lop
try:
await socket.getaddrinfo(domain, None) # top level function of curio.socket, not a event loop object
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None:
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
domains = (f'{name}.dev'.lower() for name in names)
async with TaskGroup() as group: # TaskGroup is a core concept in Curio, to monitor and control several coroutines, and to make sure they are all executed and cleaned up
for domain in domains:
await group.spawn(probe, domain) # start a coroutines
async for task in group: # this yields over a TaskGroup yielding Task instance as they are completed
domain, found = task.result
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
run(main())
Type Hinting Asynchronous Objects🔗
- return type of a native coroutine describes what you get when you
await
on that coroutine, which is type of the object that appears in thereturn
statement in the body of the native coroutine function.
async def probe(domain: str) -> tuple[str, bool]:
try:
await socket.getaddrinfo(domain, None)
except socket.gaierror:
return (domain, False)
return (domain, True)
- If you need to annotate a parameter that takes a coroutine object, then generic type is
class typing.Coroutine(Awaitable[V_co], Generic[T_co, T_contra, V_co]):
# python 3.5/3.6+
class typing.AsyncContextManager(Generic[T_co]):
...
class typing.AsyncIterable(Generic[T_co]):
...
class typing.AsyncIterator(AsyncIterable[T_co]):
...
class typing.AsyncGenerator(AsyncIterator[T_co], Generic[T_co, T_contra]):
...
class typing.Awaitable(Generic[T_co]):
...
With python 3.9+, use collection.abc
equivalents of these. Three important aspect of generics
- If a formal type parameter defines a type for data that comes out of the object, it can be covariant.
- If a formal type parameter defines a type for data that goes into the object after its initial construction, it can be contravariant.
- AsyncGenerator has no return type, in contrast with typing.Generator. Returning a value by raising StopIteration(value) was one of the hacks that enabled generators to operate as coroutines and support yield from, as we saw in “Classic Coroutines”. There is no such overlap among the asynchronous objects: AsyncGenerator objects don’t return values, and are completely separate from native coroutine objects, which are annotated with typing.Coroutine.
How Async Works and How It doesn’t🔗
Running Circle Around Blocking Calls🔗
Ryan Dahl (Nodejs), introduces philosophy of his project by saying : We’re doing I/O completely wrong, defining a blocking function as one that does file or network I/O, and argues we can’t treat them as we treat non blocking function.
The Myth of I/O Bound Systems🔗
A common repeated meme is that asynchronous programming is good for I/O bound systems
. There are no I/O bound systems. You may have I/O bound functions.
Given that any nontrivial system will have CPU-bound functions, dealing with them is the key to success in asynchronous programming
Avoiding CPU-Bound Traps🔗
- If you are using Python at scale, you should have some automated tests designed specially to detect performance regression as soon as they appear. This is critically important with asynchronous code, but also relevant to threaded Python code, because of GIL. If you wait until the slowdown starts bothering the development team, it’s too late. The fix will probably require some major makeover.
Here are some options for when you identify a CPU-hogging bottleneck
- Delegate the task to a Python process pool
- Delegate the task to an external task queue
- Rewrite relevant code in Cython, C, Rust or some other language that compiles to machine code and interfaces with Python/C API, preferably, releasing GIL
- Decide taht you can afford the performance hit and do nothing but record the decision to make it easier to revert to it later.