Skip to content

Asynchronous Programming🔗

Three main topics to be addressed,

  • Python’s async def, await, async with and async for constructs
  • Objects supporting those constructs : native coroutines and asynchronous variants of context managers, iterables, generators, and comprehensions.
  • asyncio and other asynchronous libraries

A Few Definitions🔗

Python offers three kinds of coroutines

  • Native Coroutines
    • A coroutine function defined with async def and can be delegated from a native coroutines to another native coroutine using await keyword, similar to how classic coroutines use yield from.
    • NOTE: await can’t be used outside async def function but its not necessary to exist.
  • Classic Coroutines
    • A generator function that consumes data sent to it via my_coro.send(data) calls, and reads that data by using yield in an expression. Classic coroutines can delegate to other classic coroutines using yield from.
    • Classic coroutines can’t be driven using await and are not longer supported by asyncio.
  • Generator Based Coroutines
    • A generator function decorated with @types.coroutine. That decorator makes the generator function compatible with await keyword.

Ansynchronous generator

  • A generator function defined with async def and using yield in its body. It returns an asynchronous generator object that offers __anext__ with the new await keyword.

An Asyncio Example : Probing domains🔗

A search for domains for a Python blog

#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4


async def probe(domain: str) -> tuple[str, bool]:   # returns tuple with domain name and a boolean, True meaning it resolved
    loop = asyncio.get_running_loop() # get a reference to asyncio event loop
    try:
        await loop.getaddrinfo(domain, None) # gets the five part tuple parameter to connect to given address using socket
    except socket.gaierror:
        return (domain, False)
    return (domain, True)


async def main() -> None: # main must be a coroutine
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # generator to yield python words
    domains = (f'{name}.dev'.lower() for name in names) # append .dev suffix
    coros = [probe(domain) for domain in domains] # list of coroutines
    for coro in asyncio.as_completed(coros): # as_completed is a generartor that yields coroutines that reutrn the result to the order they are completed in
        domain, found = await coro  # since this coroutine is completed await doesn't block
        mark = '+' if found else ' '
        print(f'{mark} {domain}')


if __name__ == '__main__':
    asyncio.run(main()) # starts the event loop and returns only when event loop exits

Guido’s Trick to Read Asynchronous Code🔗

  • Pretend async and await keywords are not there, If you do that you will realise that coroutines read like plain old sequential function
  • here during execute of probe('if.dev') coroutine, a new coroutine object is created by getaddrinfo. Awaiting it starts the low-level query and yields control back to event loop. The event loop can then drive other pending coroutine object and when there is a response back, that specific coroutine object resumes and returns control back to probe('if.dev') which was suspended at await and can now handle possible exceptions and return result tuple
  • asyncio.as_completed and await can be applied to any awaitable object

New Concept : Awaitable🔗

The for keyword works with iterables. The await keyword works with awaitables.

Usually these awaitables are encountered.

  • A native coroutine object, which you get by calling a native coroutine function
  • An asyncio.Task, which you usually get by passing a coroutine object to asyncio.create_task()

However, end-user code does not always need to await on a Task. We use asyncio.create_task(one_coro()) to schedule one_coro for concurrent execution, without waiting for its return. That’s what we did with the spinner coroutine in spinner_async.py

If you don’t expect to cancel the task or wait for it, there is no need to keep the Task object returned from create_task. Creating the task is enough to schedule the coroutine to run.

When implementing asynchronous libraries or contributing to asyncio itself, you may also deal with these lower-level awaitables:

  • An object with an __await__ method that returns an iterator; for example, an asyncio.Future instance (asyncio.Task is a subclass of asyncio.Future)
  • Objects written in other languages using the Python/C API with a tp_as_async.am_await function, returning an iterator (similar to __await__ method)

Downloading with asyncio and HTTPX🔗

As of Python 3.10, asyncio only supports TCP and UDP directly, and there are no asynchronous HTTP client or server packages in the standard library. we will be using HTTPX in all the HTTP client examples.

flags_asyncio.py : startup functions

def download_many(cc_list: list[str]) -> int: # needs to be plain function not a coroutine, as it passed to and called by main
    return asyncio.run(supervisor(cc_list)) # execute the event loop driving the supervisor(cc_list) couroutine object until it returns. This will block while even loop runs

async def supervisor(cc_list: list[str]) -> int:
    async with AsyncClient() as client: # asynchronous HTTP client operation in httpx are methods of AsyncClient
        to_do = [download_one(client, cc)
                 for cc in sorted(cc_list)] # list of coroutines
        res = await asyncio.gather(*to_do) # wait for .gather couroutines and waits for all of them to complete

    return len(res) # return lenght of the list returned by asyncio.gather

if __name__ == '__main__':
    main(download_many)

flags_asyncio.py : imports and download functions

import asyncio

from httpx import AsyncClient # must be installed

from flags import BASE_URL, save_flag, main # reuse code from flags.py

async def download_one(client: AsyncClient, cc: str): # download one must be a native coroutine so it can await on get_flag
    image = await get_flag(client, cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

async def get_flag(client: AsyncClient, cc: str) -> bytes: # needs to recieve the AsyncClient instance returning a ClientResponse object that is also an asychronous context manager
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=6.1,
                                  follow_redirects=True) # network I/O operation are run by default by asynchronous context manager
    return resp.read()

The Secret of Native Coroutines: Humble Generators🔗

Await channel diagram

  • A key difference b/w classic coroutines and flags_asyncio.py is that there are no visible .send() calls or yield expression in latter. You code sits b/w asyncio library and asynchronous libraries you are using, such as HTTPX.
  • Under the hood, the asyncio event loop makes the .send calls that drive your coroutines, and your coroutines await on other coroutines, including library coroutines. As mentioned, await borrows most of its implementation from yield from, which also makes .send calls to drive coroutines.
  • he await chain eventually reaches a low-level awaitable, which returns a generator that the event loop can drive in response to events such as timers or network I/O. The low-level awaitables and generators at the end of these await chains are implemented deep into the libraries, are not part of their APIs, and may be Python/C extensions.
  • Using functions like asyncio.gather and asyncio.create_task, you can start multiple concurrent await channels, enabling concurrent execution of multiple I/O operations driven by a single event loop, in a single thread.

The All-or-Nothing Problem🔗

we could not reuse the get_flag function from flags.py. We had to rewrite it as a coroutine to use the asynchronous API of HTTPX. For peak performance with asyncio, we must replace every function that does I/O with an asynchronous version that is activated with await or asyncio.create_task, so that control is given back to the event loop while the function waits for I/O. If you can’t rewrite a blocking function as a coroutine, you should run it in a separate thread or process

Asynchronous Context Managers🔗

  • a context manager implements __enter__ and __exit__ (for graceful cleanup).
  • In asynchronous driver like asyncpg, the setup and wrap-up need to be coroutines so that other operation can happen concurrently. But classic with keyword doesn’t work properly in asynchronous cases that is why async with was introduced implementing __aenter__ and __aexit__ methods as coroutines.
async with connection.transaction():
    await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
  • Using coroutines to implement Transaction as an asynchronous context manager allows asyncpg to handle many transactions concurrently.

Enhancing the asyncio Downloader🔗

  • our last implementation of progress displayed and done error handling in flags2.

Using asyncio.as_completed and a Thread🔗

  • previous example we use asyncio.gather which returned list with result of coroutines in order they were submitted which implied it return when all of them completed.
  • however to update a progress bar we need to know results as they are done
  • We will use asyncio equivalent of as_completed generator function that we used in thread pool example
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus, save_flag

# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient, # very similar to sequential implementation
                   base_url: str,
                   cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True) # .get AsyncClient method and coroutine by default so we await it
    resp.raise_for_status()
    return resp.content

async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore: # semaphore as async context manager so program as a whole is not blocked only this coroutine is suspended when semaphore is zero
            image = await get_flag(client, base_url, cc)
    except httpx.HTTPStatusError as exc: # error handling as before
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        await asyncio.to_thread(save_flag, image, f'{cc}.gif') # I/O operation, to avoid blocking event loop, run in a thread
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status

Throttling Requests with a Semaphore🔗

  • network client should be throttled to avoid doing DDOS on a server accidently
  • A semaphore is a synchronization primitive, more flexible than a lock. It can be held by multiple coroutines, with a configurable max number.
  • In threadpool implementation of flags2.py we use max_workers as number of throttle. In above example supervisor function creates asyncio.Semaphore which is passed as args to download one.
  • Computer scientist Edsger W. Dijkstra invented the semaphore in the early 1960s. It’s a simple idea, but it’s so flexible that most other synchronization objects—such as locks and barriers—can be built on top of semaphores. There are three Semaphore classes in Python’s standard library: one in threading, another in multiprocessing, and a third one in asyncio.
  • An asyncio.Semaphore has an internal counter that is decremented whenever we await on the .acquire() coroutine method, and incremented when we call the .release() method
    semaphore = asyncio.Semaphore(concur_req)
  • Awaiting on .acquire() causes no delay when the counter is greater than zero, but if the counter is zero, .acquire() suspends the awaiting coroutine until some other coroutine calls .release() on the same Semaphore, thus incrementing the counter. Instead of using those methods directly, it’s safer to use the semaphore as an asynchronous context manager
async with semaphore:
            image = await get_flag(client, base_url, cc)
  • The Semaphore.__aenter__ coroutine method awaits for .acquire(), and its __aexit__ coroutine method calls .release(). That snippet guarantees that no more than concur_req instances of get_flags coroutines will be active at any time. Each of the Semaphore classes in the standard library has a BoundedSemaphore subclass that enforces an additional constraint
# rest of the code for above asyncio downloader
async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int) -> Counter[DownloadStatus]: # same args as download_many
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req) # semaphore
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)] # create list of coroutine objects
        to_do_iter = asyncio.as_completed(to_do) # iterator that return as coroutines are completed
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # tqdm to display progress
        error: httpx.HTTPError | None = None # exceptions
        for coro in to_do_iter: # iterate over tasks
            try:
                status = await coro # await for result
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc # scope of exc limited here, so assign to error
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc
            except KeyboardInterrupt:
                break

            if error:
                status = DownloadStatus.ERROR # set a status if failed.
                if verbose:
                    url = str(error.request.url)
                    cc = Path(url).stem.upper()
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1

    return counter

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro) # instantiate supervisor coroutine object and pass it to the event loop with asyncio.run

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Making Multiple Request for Each Download🔗

  • Suppose you want to save each country flag with the name of the country and the country code, instead of just the country code. Now you need to make two HTTP requests per flag: one to get the flag image itself, the other to get the metadata.json file in the same directory as the image—that’s where the name of the country is recorded.
  • Coordinating multiple requests in the same task is easy in the threaded script: just make one request then the other, blocking the thread twice, and keeping both pieces of data (country code and name) in local variables, ready to use when saving the files. If you needed to do the same in an asynchronous script with callbacks, you needed nested functions so that the country code and name were available in their closures until you could save the file, because each callback runs in a different local scope. The await keyword provides relief from that, allowing you to drive the asynchronous requests one after the other, sharing the local scope of the driving coroutine.
  • We will implement two more functions : get_country and download_one again
# get_country
async def get_country(client: httpx.AsyncClient,
                      base_url: str,
                      cc: str) -> str: # string with country name returned
    url = f'{base_url}/{cc}/metadata.json'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    metadata = resp.json() # dict built from json
    return metadata['country'] # return country name


# download_one
async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore: # hold semaphore
            image = await get_flag(client, base_url, cc)
        async with semaphore: # again hold for get_country
            country = await get_country(client, base_url, cc)
    except httpx.HTTPStatusError as exc:
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        filename = country.replace(' ', '_') # country name to create files
        await asyncio.to_thread(save_flag, image, f'{filename}.gif')
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status
  • We put the calls to get_flag and get_country in separate with blocks controlled by the semaphore because it’s good practice to hold semaphores and locks for the shortest possible time.
  • We could schedule both get_flag and get_country in parallel using asyncio.gather, but if get_flag raises an exception, there is no image to save, so it’s pointless to run get_country. But there are cases where it makes sense to use asyncio.gather to hit several APIs at the same time instead of waiting for one response before making the next request.

Delegating Tasks to Executors🔗

One important advantage of Node.js over Python for asynchronous programming is the Node.js standard library, which provides async APIs for all I/O—not just for network I/O. In Python, if you’re not careful, file I/O can seriously degrade the performance of asynchronous applications, because reading and writing to storage in the main thread blocks the event loop.

        await asyncio.to_thread(save_flag, image, f'{cc}.gif')
  • We used above line to write data in main thread. Instead we can get reference to main event loop and delegate this task to another thread using asyncio.to_thread
        loop = asyncio.get_running_loop() # get eventloop
        loop.run_in_executor(None, save_flag, # None defaults to ThreadPoolExecutor
                             image, f'{cc}.gif') # positional args to function to run.

A common pattern in asynchronous APIs is to wrap blocking calls that are implementation details in coroutines using run_in_executor internally. That way, you provide a consistent interface of coroutines to be driven with await, and hide the threads you need to use for pragmatic reasons.

The main reason to pass an explict Executor to loop.run_in_executor is to employ a ProcessPoolExecutor if the function to execute is CPU intensive, so that it runs in a different Python process, avoiding contention for the GIL. Because of the high start-up cost, it would be better to start the ProcessPoolExecutor in the supervisor, and pass it to the coroutines that need to use it.

Writing asyncio Server🔗

  • We will implement an echo server whihc allows unicode serar utilities. First using HTTP with FastAPI, then using plain TCP with asyncio only
from pathlib import Path
from unicodedata import name

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel

from charindex import InvertedIndex

STATIC_PATH = Path(__file__).parent.absolute() / 'static' # overload / operator by pathlib

app = FastAPI( # ASGI app
    title='Mojifinder Web',
    description='Search for Unicode characters by name.',
)

class CharName(BaseModel): # A pydantic schema for JOSN Response with char and name fields
    char: str
    name: str

def init(app): # build the index and load static HTML form
    app.state.index = InvertedIndex()
    app.state.form = (STATIC_PATH / 'form.html').read_text()

init(app) # run init

@app.get('/search', response_model=list[CharName])
async def search(q: str): # fast api assumes that any parameter not in the route path will be passed in the HTTP query string
    chars = sorted(app.state.index.search(q))
    return ({'char': c, 'name': name(c)} for c in chars) # return list of iterables of dicts

@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form(): # regular function can be used to produce responses
    return app.state.form

# no main funcion  10
  • FastAPI is built on the Starlette ASGI toolkit, which in turn uses asyncio.

An asyncio TCP Server🔗

async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
    server = await asyncio.start_server( # gets an instance of asyncio.Server (TCP socket server)
        functools.partial(finder, index), # client_connected_cb, a callback to run when a client connects, call back can be function or coroutine that accepts two args asyncio.StreamReader and asyncio.StreamWriter, however our finder coroutine also gets an index, so we use partial to bind that parameter and obtain a callable that takes reader and writer.
        host, port)

    socket_list = cast(tuple[TransportSocket, ...], server.sockets) # cast is needed because typeshed has an outdated type hint for sockets property of Server class
    addr = socket_list[0].getsockname()
    print(f'Serving on {addr}. Hit CTRL-C to stop.')
    await server.serve_forever() # server forever

def main(host: str = '127.0.0.1', port_arg: str = '2323'):
    port = int(port_arg)
    print('Building index.')
    index = InvertedIndex()
    try:
        asyncio.run(supervisor(index, host, port)) # start event loop supervisor
    except KeyboardInterrupt: # interrupt to shut down
        print('\nServer shut down.')

if __name__ == '__main__':
    main(*sys.argv[1:])
# rest of the implementation
import asyncio
import functools
import sys
from asyncio.trsock import TransportSocket
from typing import cast

from charindex import InvertedIndex, format_results

CRLF = b'\r\n'
PROMPT = b'?> '

async def finder(index: InvertedIndex,
                 reader: asyncio.StreamReader,
                 writer: asyncio.StreamWriter) -> None:
    client = writer.get_extra_info('peername') # remote client name
    while True:  4
        writer.write(PROMPT)  # can't await! this line sends ?> prompt
        await writer.drain()  # must await! # its a coroutine so drive using await
        data = await reader.readline() # coroutine that returns bytes
        if not data:
            break
        try:
            query = data.decode().strip()
        except UnicodeDecodeError: # can happend when user hits Ctrl-C and telnet client sends control bytes. replace query with null character
            query = '\x00'
        print(f' From {client}: {query!r}') # log
        if query:
            if ord(query[:1]) < 32:
                break
            results = await search(query, index, writer)
            print(f'   To {client}: {results} results.')

    writer.close() # Close stream
    await writer.wait_closed() # wait for it to close
    print(f'Close {client}.') # log end of client's session to server console
# search 
async def search(query: str, # must be coroutine
                 index: InvertedIndex,
                 writer: asyncio.StreamWriter) -> int:
    chars = index.search(query) # query inverted index
    lines = (line.encode() + CRLF for line # generator expression yields byte string encoded in UTF-8 with actual unicode code point
                in format_results(chars))
    writer.writelines(lines) # send the lines
    await writer.drain() # drain
    status_line = f'{"─" * 66} {len(chars)} found' # build status and send
    writer.write(status_line.encode() + CRLF)
    await writer.drain()
    return len(chars)

Asynchronous Iteration and Asynchronous Iterables🔗

Async Beyond asyncio: Curio🔗

Python’s async/await language constructs are not tied to any specific event loop or library. Thanks to the extensible API provided by special methods anyone can write their own asynchronous runtime environment and framework to drive native coroutines, asynchronous generators, etc.

That’s what David Beazily did in his Curio project. Curio has a cleaner API and a simpler implementation, compared to asyncio

#!/usr/bin/env python3
from curio import run, TaskGroup
import curio.socket as socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4


async def probe(domain: str) -> tuple[str, bool]: # no need to get event lop
    try:
        await socket.getaddrinfo(domain, None) # top level function of curio.socket, not a event loop object
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

async def main() -> None:
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
    domains = (f'{name}.dev'.lower() for name in names)
    async with TaskGroup() as group: # TaskGroup is a core concept in Curio, to monitor and control several coroutines, and to make sure they are all executed and cleaned up
        for domain in domains:
            await group.spawn(probe, domain) # start a coroutines
        async for task in group: # this yields over a TaskGroup yielding Task instance as they are completed
            domain, found = task.result
            mark = '+' if found else ' '
            print(f'{mark} {domain}')

if __name__ == '__main__':
    run(main())

Type Hinting Asynchronous Objects🔗

  • return type of a native coroutine describes what you get when you await on that coroutine, which is type of the object that appears in the return statement in the body of the native coroutine function.
async def probe(domain: str) -> tuple[str, bool]:
    try:
        await socket.getaddrinfo(domain, None)
    except socket.gaierror:
        return (domain, False)
    return (domain, True)
  • If you need to annotate a parameter that takes a coroutine object, then generic type is
class typing.Coroutine(Awaitable[V_co], Generic[T_co, T_contra, V_co]):

# python 3.5/3.6+
class typing.AsyncContextManager(Generic[T_co]):
    ...
class typing.AsyncIterable(Generic[T_co]):
    ...
class typing.AsyncIterator(AsyncIterable[T_co]):
    ...
class typing.AsyncGenerator(AsyncIterator[T_co], Generic[T_co, T_contra]):
    ...
class typing.Awaitable(Generic[T_co]):
    ...

With python 3.9+, use collection.abc equivalents of these. Three important aspect of generics

  • If a formal type parameter defines a type for data that comes out of the object, it can be covariant.
  • If a formal type parameter defines a type for data that goes into the object after its initial construction, it can be contravariant.
  • AsyncGenerator has no return type, in contrast with typing.Generator. Returning a value by raising StopIteration(value) was one of the hacks that enabled generators to operate as coroutines and support yield from, as we saw in “Classic Coroutines”. There is no such overlap among the asynchronous objects: AsyncGenerator objects don’t return values, and are completely separate from native coroutine objects, which are annotated with typing.Coroutine.

How Async Works and How It doesn’t🔗

Running Circle Around Blocking Calls🔗

Ryan Dahl (Nodejs), introduces philosophy of his project by saying : We’re doing I/O completely wrong, defining a blocking function as one that does file or network I/O, and argues we can’t treat them as we treat non blocking function.

The Myth of I/O Bound Systems🔗

A common repeated meme is that asynchronous programming is good for I/O bound systems. There are no I/O bound systems. You may have I/O bound functions.

Given that any nontrivial system will have CPU-bound functions, dealing with them is the key to success in asynchronous programming

Avoiding CPU-Bound Traps🔗

  • If you are using Python at scale, you should have some automated tests designed specially to detect performance regression as soon as they appear. This is critically important with asynchronous code, but also relevant to threaded Python code, because of GIL. If you wait until the slowdown starts bothering the development team, it’s too late. The fix will probably require some major makeover.

Here are some options for when you identify a CPU-hogging bottleneck

  • Delegate the task to a Python process pool
  • Delegate the task to an external task queue
  • Rewrite relevant code in Cython, C, Rust or some other language that compiles to machine code and interfaces with Python/C API, preferably, releasing GIL
  • Decide taht you can afford the performance hit and do nothing but record the decision to make it easier to revert to it later.