Skip to content

Concurrency Models in Pythonđź”—

  • Rob Pike (Co-inventor of the Go language) explains concurrency as dealing with lots of things at once, while parallelism is all about doing lots of things at once.
  • Concurrency provides a way to structure a solution to solve a problem that may (but not necessarily) be parallelizable.

The Big Pictuređź”—

  • concurrent programming is hard, mostly its very easy to start threads or processes but keeping track of them is very difficult.
  • starting a thread or a process is not cheap, so you don’t want to start one of them just to perform a single computation and quit. Often you want to amortize the startup cost by making each thread or process into a “worker” that enters a loop and stands by for inputs to work on further complicating communications.
  • A couroutine is cheap to start. If you start a coroutine using await keyword, its eas to get a value returned by it, it can be safely cancelled, and you have a clear site to catch exception. But coroutines are often started by asynchronous framework making them hard to monitor.
  • Finally, Python coroutines and threads are not suitable for CPU-intensive tasks, as we’ll see.

A bit of Jargonđź”—

  • Concurrency: ability of handle multiple tasks at once, make progress parallely or one at a time.
  • Parallelism: ability to execute computation at the same time. Requires multicore CPU, multiple CPUs, GPU, clusters.
  • Execution Unit: general term for objects that execute code concurrently, each with independent state and call stack. e.g. processes, threads, and coroutines.
  • Process: An instance of computer program while it is running uses memory and slice of CPU Time. Processes communicate via pipes, sockets or memory mapped files all of which can only carry raw bytes. Python objects must be serialized into raw bytes to pass from one process to antoher. This is costly and not all python objects are serializable, A process can spawn subprocess called as child process. Processes allow preemptive multitasking i.e. OS Scheduler can suspend processes to allow running other processes.
  • Thread: An execution unit within single process. When a process starts it uses a single thread (main). A process can create more threads to operate concurrently by calling operating system APIs. Threads within a process share same memory space allowing easy data sharing between threads but beware it can also cause corrupting when threads update same memory space. Threads are also enable preemptive multitasking
  • Coroutine: A fucntion that can suspend itself and resume later. In python classic couroutine are build from generator functions, and antive coroutines are defined with async def.
  • Queue: A data structure that lets us put and get items in FIFO order. Queues allow separate execution units to exchange application data and control messages such as error codes and signals to terminate. Implementation of queue varies according to underlying concurrency model. The queue in standard library provides queue classes to support thread, while multiprocessing and asyncio packages have their own queue classes.
  • Lock: AN object that execution units can use to synchronize their execution and avoid data corruption.The implementation of a lock depends on the underlying concurrency model.
  • Contention: Dispute over a limited asset. Resource contention happens when multiple execution units try to access a shared resource such as a lock or storage. There’s also CPU contention when compute intensive processes or threads must wait for the OS scheduler to give them a share of the CPU Time.

Processes, Threads and Python’s Infamous GIL🔗

  • Each instance of Python interpreter is a process, we can use multiprocessing or concurrent.futures libraries to start additional process. Subprocess is desinged to launch processes to run external programs.
  • The python interpreter uses a single thread to run the user’s program and memory garbage collector. We can use threading library to create additonal threads.
  • Access to object reference counts and other internal interpreter state is controlled by a lock, the Global Interpreter Lock (GIL). Only one python thread can hold the GIL at any time. This means that only one thread can execute Python code at any time, regardless of the number of CPU cores
  • To prevent a Pyhton thread from holding the GIL indefinitely, Python bytecode interpreter pauses the current python thread every 5ms by default releasing the GIL.
  • When we write python code, we have no control over the GIL. But a built-in function or an extension written in C- or any language that interface at the Python/C API level can release the GIL while running time-consuming tasks.
  • Every python standard library function that makes a syscall releases the GIL. This includes all functions that perform disk I/O, network I/O, and time.sleep().
  • Extension that integrate at the Python/C API level can also launch other non Python threads that are not affected by GIL. Such GIL-free threads generally can’t change python objects but they can read from and write to memory underlying objects that support the buffer protocol, such as bytearray, array.array and NumPy arrays.
  • The effect of the GIL on network programming with Python threads is relatively small, because I/O function release the GIL, and reading or writing to the network always implies high latency compared to reading and writing to memory.
  • Contention over the GIL slows down the compute intensive Python threads. Sequential, single-threaded code is simpler and faster for such tasks.
  • To run CPU intensive Python code on multiple cores you must use multiple Python processes

A Concurrent Hello Worldđź”—

Spinner with Threadsđź”—

start a function that blocks for 3 seconds whiel animating characters in the terminal to let user know that the program is “thinking” and not stalled.

import itertools
import time
from threading import Thread, Event

def spin(msg: str, done: Event) -> None:    # will run in a separate thread. done argument is thread Event to synchronize threads
    for char in itertools.cycle(r'\|/-'):   # infinite chars yielding one at a time
        status = f'\r{char} {msg}'  # trick to move cursor back to start of the line
        print(status, end ='', flush=True)  # flush and clean the line
        if done.wait(.1):   # returns True when event is set by another thread. if the timeout elapses, it runs False. # effectively sets frame rate
            break
    blanks = '' * len(status)
    print(f'\r{blanks}\r', end = '')


def slow()->int:
    time.sleep(3)
    return 42

# supervisor and # main function
def supervisor() -> int:    # will return result of slow
    done = Event()  # Event object that co-ordinates the activities of main thread and spinner thread
    spinner = Thread(target=spin, args = ('thinking!', done))   # target is spin
    print(f'spinner object: {spinner}')
    spinner.start() # start the spinner, keeps on running animation
    result = slow() # slow function called, blocking main thread
    done.set()  # update the event status to True, useful to get out of for loop in spin
    spinner.join()  # wait until spinner finishes
    return result

def main()->None:
    result = supervisor()
    print(f'Answer: {result}')

if __name__ == '__main__':
    main()

Lets implement same with processes

Spinner with Processesđź”—

  • multiprocessing pacakge supports running concurrent tasks in separate Python processes instead of threads. When you create multiprocessing.Process instance, a whole new Python interpretor is started as a child process in background.
  • Since each Python process has its own GIL, this allows your program to use all available CPU cores—but that ultimately depends on the operating system scheduler.
import itertools
import time
from multiprocessing import Process, Event
from multiprocessing import synchronize

def spin(msg: str, done: synchronize.Event) -> None:
# [same code as previous]

def supervisor() -> int:
    done = Event()
    spinner = Process(target=spin, args = ('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()
    return result

# [same code as previous]
  • <Process name='Process-1' parent=14868 initial>, where 14868 is the process ID of the Python instance running spinner_proc.py
  • The basic API of threading and multiprocessing are similar, but their implementation is very different, and multiprocessing has a much larger API to handle the added complexity of multiprocess programming.
  • For example, one challenge when converting from threads to processes is how to communicate between processes that are isolated by the operating system and can’t share Python objects. This means that objects crossing process boundaries have to be serialized and deserialized, which creates overhead.

Spinner with Coroutinesđź”—

  • read in this order, supervisor, main, spin, slow
import itertools
import asyncio

async def spin(msg: str) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, end ='', flush=True)
        try:
            await asyncio.sleep(.1) # sleep without blocking other coroutines.
        except asyncio.CancelledError:
            break
    blanks = '' * len(status)
    print(f'\r{blanks}\r', end = '')


async def slow()->int:
    await asyncio.sleep(3)
    return 42

async def supervisor() -> int:  # native couroutines are async def
    spinner = asyncio.create_task(spin('thinking!'))    # schedules eventual execution of spin, immediately return instance of asyncio.Task
    print(f'spinner object: {spinner}')
    result = await slow()   # calls slow blocking supervisor till it returns
    spinner.cancel()    # raises CancelledError
    return result

def main()->None:
    result = asyncio.run(supervisor())  # starts the event loop to drive the coroutines that will set other coroutines in motion. Main will be blocker till supervisor returns
    print(f'Answer: {result}')

if __name__ == '__main__':
    main()
  • Interesting experiment, using this spinner never appears ? Why
async def slow() -> int:
    time.sleep(3)
    return 42
  • To understand what is happening, recall that Python code using asyncio has only one flow of execution, unless you’ve explicitly started additional threads or processes. That means only one coroutine executes at any point in time. Concurrency is achieved by control passing from one coroutine to another.
  • when await transfers control to slow coroutine, sleep block for 3 seconds, nothing else can happen in program as main thread is blocked and its the only thread.
  • Right after slow returns, the spinner task is cancelled. The flow of control never reached the body of the spin coroutine.

Supervisor Side-by-Sideđź”—

  • An asyncio.Task is roughly the equivalent of a threading.Thread.
  • A Task drives a coroutine object, and a Thread invokes a callable.
  • A coroutine yields control explicitly with the await keyword.
  • You don’t instantiate Task objects yourself, you get them by passing a coroutine to asyncio.create_task(…).
  • When asyncio.create_task(…) returns a Task object, it is already scheduled to run, but a Thread instance must be explicitly told to run by calling its start method.
  • In the threaded supervisor, slow is a plain function and is directly invoked by the main thread. In the asynchronous supervisor, slow is a coroutine driven by await.
  • There’s no API to terminate a thread from the outside; instead, you must send a signal—like setting the done Event object. For tasks, there is the Task.cancel() instance method, which raises CancelledError at the await expression where the coroutine body is currently suspended.
  • The supervisor coroutine must be started with asyncio.run in the main function.

The Real Impact of GILđź”—

  • In all the implementation of slow function we can replace it with any well designed asynchronous network library call and spinner will wait till that request completes. Because such libraries provide coroutines that yield control back to the event loop while waiting for network.
  • NOTE: but for CPU-intensive code, story is different because it will be busy calculating that task. Take example of calculating prime number function
    • In spinner_proc.py, replacing time.sleep(3) with is_prime(n) : since spinner is controlled by child process so it keeps spinning while primality test is computed by parent process.
    • In spinner_thread.py : spinner is controlled by secondary thread, so it continues spinning while the primality test is computed by main thread. In this example spinner runs because Python suspends thread every 5ms, making GIL available to other pending threads. This doesn’t have visible impact on running time of this specific example, because quickly iterates once and releases the GIL as it waits for done event, so there is not much contention. Because there are two thread, one is CPU intensive while other is not.
    • In spinner_asycnio.py, replacing await asyncio.sleep(3) with a call to is_prime(n) : spinner never appears, similar to replacing it with sleep(3). The flow of control will pass from supervisor to slow and then to is_prime. When is_prime return, slow return, supervisor resumes, cancelling spinner task even before its executes. One way to keep spinner alive is writing it as coroutine and periodically call asyncio.sleep(0) in an await expression to yield control back to even loop. However that ends up slowing is_prime because of passing of control here and there.

A Homegrown Process Poolđź”—

Lets write a program to check primality of a sample of 20 integers, from 2 to 10^16 - 1

In serial case its approxmately the sum of times for each check but it is computed separately

# primes.py
import math

def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    root = math.isqrt(n)
    for i in range(3, root + 1, 2):
        if n % i == 0:
            return False
    return True


NUMBERS = [
            2,
            3333333333333333,
            4444444444444444,
            5555555555555555,
            6666666666666666,
            142702110479723,
            7777777777777777,
            299593572317531,
            9999999999999999,
            3333333333333301,
            3333335652092209,
            4444444488888889,
            4444444444444423,
            5555553133149889,
            5555555555555503,
            6666666666666719,
            6666667141414921,
            7777777536340681,
            7777777777777753,
            9999999999999917,
        ]
#!/usr/bin/env python3

"""
sequential.py: baseline for comparing sequential, multiprocessing,
and threading code for CPU-intensive work.
"""

from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class Result(NamedTuple):
    prime: bool
    elapsed: float

def check(n: int) -> Result:
    t0 = perf_counter()
    prime = is_prime(n)
    return Result(prime, perf_counter() - t0)

def main() -> None:
    print(f'Checking {len(NUMBERS)} numbers sequentially:')
    t0 = perf_counter()
    for n in NUMBERS:
        prime, elapsed = check(n)
        label = 'P' if prime else ' '
        print(f'{n:16}  {label} {elapsed:9.6f}s')

    elapsed = perf_counter() - t0
    print(f'Total time: {elapsed:.2f}s')

if __name__ == '__main__':
    main()
  • benchmark takes 14s on my laptop

Process-Based Solutionđź”—

Let’s distribute primality across cores : Lets create a number of worker process equal to number of CPU cores

# procs.py
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count  # emulating threading using multiprocessing.
from multiprocessing import queues # multiprocessing.queue has SimpleQueue class for type hints

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple): # tuple representing Result
    n: int
    prime: bool
    elapsed: float

JobQueue = queues.SimpleQueue[int] # TypeAlias
ResultQueue = queues.SimpleQueue[PrimeResult] # TypeAlias

def check(n: int) -> PrimeResult:   # similar to sequential.py
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def worker(jobs: JobQueue, results: ResultQueue) -> None:   # worker gets a queue with numbers, and another to put results
    while n := jobs.get(): # 0 is like poison pill: signal for worker to finish
        results.put(check(n))  # put items in queue for checking
    results.put(PrimeResult(0, False, 0.0)) # send back to main loop to indicate end of worker

def start_jobs(
    procs: int, jobs: JobQueue, results: ResultQueue    # procs is number of prime check in parallel
) -> None:
    for n in NUMBERS:
        jobs.put(n) # enqueue the numbers to be checked in jobs
    for _ in range(procs):
        proc = Process(target=worker, args=(jobs, results)) # fork child process for each worker
        proc.start() # start child process
        jobs.put(0) # poison pill
# main of above program
def main() -> None:
    if len(sys.argv) < 2: # either pass cpu count or takes from library
        procs = cpu_count()
    else:
        procs = int(sys.argv[1])

    print(f'Checking {len(NUMBERS)} numbers with {procs} processes:')
    t0 = perf_counter()
    jobs: JobQueue = SimpleQueue()  # create job queue
    results: ResultQueue = SimpleQueue()
    start_jobs(procs, jobs, results) # start jobs
    checked = report(procs, results) # retrieve result and display
    elapsed = perf_counter() - t0
    print(f'{checked} checks in {elapsed:.2f}s') # display all numbers checked

def report(procs: int, results: ResultQueue) -> int: # args are the number of procs queue to post the result
    checked = 0
    procs_done = 0
    while procs_done < procs: # loop till all process completes
        n, prime, elapsed = results.get() # get one PrimeResult
        if n == 0: # if n is zero, process exited
            procs_done += 1
        else:
            checked += 1 
            label = 'P' if prime else ' '
            print(f'{n:16}  {label} {elapsed:9.6f}s')
    return checked

if __name__ == '__main__':
    main()
  • Notice how time is reduce to 4.13s

Python in Multicore Worldđź”—

Herb Sutter commented : The major processor manufacturers and architectures, from Intel and AMD to Sparc and PowerPC, have run out of room with most of their traditional approaches to boosting CPU performance. Instead of driving clock speeds and straight-line instruction throughput ever higher, they are instead turning en masse to hyper-threading and multicore architectures.

What Sutter calls the “free lunch” was the trend of software getting faster with no additional developer effort because CPUs were executing sequential code faster, year after year. Since 2004, that is no longer true: clock speeds and execution optimizations reached a plateau, and now any significant increase in performance must come from leveraging multiple cores or hyperthreading, advances that only benefit code that is written for concurrent execution.

Python’s story started in the early 1990s, when CPUs were still getting exponentially faster at sequential code execution. There was no talk about multicore CPUs except in supercomputers back then. At the time, the decision to have a GIL was a no-brainer. The GIL makes the interpreter faster when running on a single core, and its implementation simpler. The GIL also makes it easier to write simple extensions through the Python/C API.

Despite the GIL, Python is thriving in applications that require concurrent or parallel execution, thanks to libraries and software architectures that work around the limitations of CPython.

System Administrationđź”—

Python is widely used to manage large fleets of servers, routers, load balancers, and network-attached storage (NAS). It’s also a leading option in software-defined networking (SDN) and ethical hacking. Major cloud service providers support Python through libraries and tutorials authored by the providers themselves or by their large communities of Python users.

In this domain, Python scripts automate configuration tasks by issuing commands to be carried out by the remote machines, so rarely there are CPU-bound operations to be done. Threads or coroutines are well suited for such jobs.

Beyond the standard library, there are popular Python-based projects to manage server clusters: tools like Ansible and Salt, as well as libraries like Fabric.

Data Scienceđź”—

Data science—including artificial intelligence—and scientific computing are very well served by Python. Applications in these fields are compute-intensive, but Python users benefit from a vast ecosystem of numeric computing libraries written in C, C++, Fortran, Cython, etc.—many of which are able to leverage multicore machines, GPUs, and/or distributed parallel computing in heterogeneous clusters.

  • Project Jupyter : Two browser-based interfaces—Jupyter Notebook and JupyterLab—that allow users to run and document analytics code potentially running across the network on remote machines.
  • TensorFlow and Pytorch: top two deep learning frameworks. Both are written in C++ and are able to leverage multiple cores, GPUs and clusters.
  • Dask: parallel computing library that can farm out work to local processes or cluster of machines.

Server-Side Web/Mobile Developmentđź”—

ython is widely used in web applications and for the backend APIs supporting mobile applications. How is it that Google, YouTube, Dropbox, Instagram, Quora, and Reddit—among others—managed to build Python server-side applications serving hundreds of millions of users 24x7

  • At web scale, the key is an architecture that allows horizontal scaling. At that point, all systems are distributed systems, and no single programming language is likely to be the right choice for every part of the solution.
  • Distributed systems is a field of academic research, but fortunately some practitioners have written accessible books anchored on solid research and practical experience. One of them is Martin Kleppmann, the author of Designing Data-Intensive Applications (O’Reilly).

Consider figure, the first of many architecture diagrams in Kleppmann’s book. Here are some components I’ve seen in Python engagements:

  • Application caches:memcached, Redis, Varnish
  • Relational databases: PostgreSQL, MySQL
  • Document databases: Apache CouchDB, MongoDB
  • Full-text indexes: Elasticsearch, Apache Solr
  • Message queues: RabbitMQ, Redis

Architecture for data system that combining several components

WSGI Application Serversđź”—

WSGI—the Web Server Gateway Interface—is a standard API for a Python framework or application to receive requests from an HTTP server and send responses to it. WSGI application servers manage one or more processes running your application, maximizing the use of the available CPUs.

best-know application servers in python web projects are

  • mod_wsgi
  • uWSGI24
  • Gunicorn
  • NGINX Unit

Distributed Task Queuesđź”—

When the application server delivers a request to one of the Python processes running your code, your app needs to respond quickly: you want the process to be available to handle the next request as soon as possible. However, some requests demand actions that may take longer—for example, sending email or generating a PDF. That’s the problem that distributed task queues are designed to solve.

Celery and RQ are the best known open source task queues with Python APIs. Cloud providers also offer their own proprietary task queues.

These products wrap a message queue and offer a high-level API for delegating tasks to workers, possibly running on different machines.

Quoting directly from Celery’s FAQ, here are some typical use cases:

  • Running something in the background. For example, to finish the web request as soon as possible, then update the users page incrementally. This gives the user the impression of good performance and “snappiness,” even though the real work might actually take some time.
  • Running something after the web request has finished.
  • Making sure something is done, by executing it asynchronously and using retries.
  • Scheduling periodic work.

Besides solving these immediate problems, task queues support horizontal scalability. Producers and consumers are decoupled: a producer doesn’t call a consumer, it puts a request in a queue. Consumers don’t need to know anything about the producers (but the request may include information about the producer, if an acknowledgment is required). Crucially, you can easily add more workers to consume tasks as demand grows. That’s why Celery and RQ are called distributed task queues.