Asynchronous I/O With Python 3

In this tutorial you’ll go through a whirlwind tour of the asynchronous I/O facilities introduced in Python 3.4 and improved further in Python 3.5 and 3.6.

Asynchronous I/O With Python 3

Python previously had few great options for asynchronous programming. The new Async I/O support finally brings first-class support that includes both high-level APIs and standard support that aims to unify multiple third-party solutions (Twisted, Gevent, Tornado, asyncore, etc.).

It’s important to understand that learning Python’s async IO is not trivial due to the rapid iteration, the scope, and the need to provide a migration path to existing async frameworks. I’ll focus on the latest and greatest to simplify a little.

There are many moving parts that interact in interesting ways across thread boundaries, process boundaries, and remote machines. There are platform-specific differences and limitations. Let’s jump right in.

Pluggable Event Loops

The core concept of async IO is the event loop. In a program, there may be multiple event loops. Each thread will have at most one active event loop. The event loop provides the following facilities:

  • Registering, executing and cancelling delayed calls (with timeouts).
  • Creating client and server transports for various kinds of communication.
  • Launching subprocesses and the associated transports for communication with an external program.
  • Delegating costly function calls to a pool of threads.

Quick Example

Here is a little example that starts two coroutines and calls a function in delay. It shows how to use an event loop to power your program:

import asyncio


async def foo(delay):
    for i in range(10):
        print(i)
        await asyncio.sleep(delay)


def stopper(loop):
    loop.stop()


loop = asyncio.get_event_loop()

# Schedule a call to foo()
loop.create_task(foo(0.5))
loop.create_task(foo(1))
loop.call_later(12, stopper, loop)

# Block until loop.stop() is called()
loop.run_forever()
loop.close()

The AbstractEventLoop class provides the basic contract for event loops. There are many things an event loop needs to support:

  • Scheduling functions and coroutines for execution
  • Creating futures and tasks
  • Managing TCP servers
  • Handling signals (on Unix)
  • Working with pipes and subprocesses

Here are the methods related to running and stopping the event as well as scheduling functions and coroutines:

class AbstractEventLoop:
    """Abstract event loop."""

    # Running and stopping the event loop.

    def run_forever(self):
        """Run the event loop until stop() is called."""
        raise NotImplementedError

    def run_until_complete(self, future):
        """Run the event loop until a Future is done.

        Return the Future's result, or raise its exception.
        """
        raise NotImplementedError

    def stop(self):
        """Stop the event loop as soon as reasonable.

        Exactly how soon that is may depend on the implementation, but
        no more I/O callbacks should be scheduled.
        """
        raise NotImplementedError

    def is_running(self):
        """Return whether the event loop is currently running."""
        raise NotImplementedError

    def is_closed(self):
        """Returns True if the event loop was closed."""
        raise NotImplementedError

    def close(self):
        """Close the loop.

        The loop should not be running.

        This is idempotent and irreversible.

        No other methods should be called after this one.
        """
        raise NotImplementedError

    def shutdown_asyncgens(self):
        """Shutdown all active asynchronous generators."""
        raise NotImplementedError

    # Methods scheduling callbacks.  All these return Handles.

    def _timer_handle_cancelled(self, handle):
        """Notification that a TimerHandle has been cancelled."""
        raise NotImplementedError

    def call_soon(self, callback, *args):
        return self.call_later(0, callback, *args)

    def call_later(self, delay, callback, *args):
        raise NotImplementedError

    def call_at(self, when, callback, *args):
        raise NotImplementedError

    def time(self):
        raise NotImplementedError

    def create_future(self):
        raise NotImplementedError

    # Method scheduling a coroutine object: create a task.

    def create_task(self, coro):
        raise NotImplementedError

    # Methods for interacting with threads.

    def call_soon_threadsafe(self, callback, *args):
        raise NotImplementedError

    def run_in_executor(self, executor, func, *args):
        raise NotImplementedError

    def set_default_executor(self, executor):
        raise NotImplementedError

Plugging in a new Event Loop

Asyncio is designed to support multiple implementations of event loops that adhere to its API. The key is the EventLoopPolicy class that configures asyncio and allows the controlling of every aspect of the event loop. Here is an example of a custom event loop called uvloop based on the libuv, which is supposed to be much faster that the alternatives (I haven’t benchmarked it myself):

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

That’s it. Now, whenever you use any asyncio function, it’s uvloop under the covers.

Coroutines, Futures, and Tasks

A coroutine is a loaded term. It is both a function that executes asynchronously and an object that needs to be scheduled. You define them by adding the async keyword before the definition:

import asyncio


async def cool_coroutine():
    return "So cool..."

If you call such a function, it doesn’t run. Instead, it returns a coroutine object, and if you don’t schedule it for execution then you’ll get a warning too:

c = cool_coroutine()
print(c)

Output:

<coroutine object cool_coroutine at 0x108a862b0>
sys:1: RuntimeWarning: coroutine 'cool_coroutine' was never awaited

Process finished with exit code 0

To actually execute the coroutine, we need an event loop:

r = loop.run_until_complete(c)
loop.close()

print(r)

Output:

So cool...

That’s direct scheduling. You can also chain coroutines. Note that you have to call await when invoking coroutines:

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

The asyncio Future class is similar to the concurrent.future.Future class. It is not threadsafe and supports the following features:

  • adding and removing done callbacks
  • cancelling
  • setting results and exceptions

Here is how to use a future with the event loop. The take_your_time() coroutine accepts a future and sets its result after sleeping for a second.

The ensure_future() function schedules the coroutine, and wait_until_complete() waits for the future to be done. Behind the curtain, it adds a done callback to the future.

import asyncio

async def take_your_time(future):
    await asyncio.sleep(1)
    future.set_result(42)

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(take_your_time(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

This is pretty cumbersome. Asyncio provides tasks to make working with futures and coroutines more pleasant. A Task is a subclass of Future that wraps a coroutine and that you can cancel.

The coroutine doesn’t have to accept an explicit future and set its result or exception. Here is how to perform the same operations with a task:

import asyncio

async def take_your_time():
    await asyncio.sleep(1)
    return 42

loop = asyncio.get_event_loop()
task = loop.create_task(take_your_time())
loop.run_until_complete(task)
print(task.result())
loop.close()

Transports, Protocols, and Streams

A transport is an abstraction of a communication channel. A transport always supports a particular protocol. Asyncio provides built-in implementations for TCP, UDP, SSL, and subprocess pipes.

If you’re familiar with socket-based network programming then you’ll feel right at home with transports and protocols. With Asyncio, you get asynchronous network programming in a standard way. Let’s look at the infamous echo server and client (the “hello world” of networking).

First, the echo client implements a class called EchoClient that is derived from the asyncio.Protocol. It keeps its event loop and a message it will send to the server upon connection.

In the connection_made() callback, it writes its message to the transport. In the data_received() method, it just prints the server’s response, and in the connection_lost() method it stops the event loop. When passing an instance of the EchoClient class to the loop’s create_connection() method, the result is a coroutine that the loop runs until it completes.

import asyncio

class EchoClient(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClient(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()  

The server is similar except that it runs forever, waiting for clients to connect. After it sends an echo response, it also closes the connection to the client and is ready for the next client to connect.

A new instance of the EchoServer is created for each connection, so even if multiple clients connect at the same time, there will be no problem of conflicts with the transport attribute.

import asyncio

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServer, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
print('Serving on {}'.format(server.sockets[0].getsockname()))
loop.run_forever()

Here is the output after two clients connected:

Serving on ('127.0.0.1', 8888)
Connection from ('127.0.0.1', 53248)
Data received: 'Hello World!'
Send: 'Hello World!'
Close the client socket
Connection from ('127.0.0.1', 53351)
Data received: 'Hello World!'
Send: 'Hello World!'
Close the client socket

Streams provide a high-level API that is based on coroutines and provides Reader and Writer abstractions. The protocols and the transports are hidden, there is no need to define your own classes, and there are no callbacks. You just await events like connection and data received.

The client calls the open_connection() function that returns the reader and writer objects used naturally. To close the connection, it closes the writer.

import asyncio


async def tcp_echo_client(message, loop):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 
        8888, 
        loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())

    data = await reader.read(100)
    print('Received: %r' % data.decode())

    print('Close the socket')
    writer.close()


message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

The server is also much simplified.

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, 
                            '127.0.0.1', 
                            8888, 
                            loop=loop)
server = loop.run_until_complete(coro)
print('Serving on {}'.format(server.sockets[0].getsockname()))
loop.run_forever()

Working With Sub-Processes

Asyncio covers interactions with sub-processes too. The following program launches another Python process and executes the code “import this”. It is one of Python’s famous Easter eggs, and it prints the “Zen of Python”. Check out the output below.

The Python process is launched in the zen() coroutine using the create_subprocess_exec() function and binds the standard output to a pipe. Then it iterates over the standard output line by line using await to give other processes or coroutines a chance to execute if output is not ready yet.

Note that on Windows you have to set the event loop to the ProactorEventLoop because the standard SelectorEventLoop doesn’t support pipes.

import asyncio.subprocess
import sys


async def zen():
    code = 'import this'
    create = asyncio.create_subprocess_exec(
        sys.executable, 
        '-c', 
        code,
        stdout=asyncio.subprocess.PIPE)
    proc = await create

    data = await proc.stdout.readline()
    while data:
        line = data.decode('ascii').rstrip()
        print(line)
        data = await proc.stdout.readline()

    await proc.wait()

if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

loop.run_until_complete(zen())

Output:

The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to
do it.
Although that way may not be obvious at first unless you're
Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!

Conclusion

Don’t hesitate to see what we have available for sale and for study in the marketplace, and don’t hesitate to ask any questions and provide your valuable feedback using the feed below.

Python’s asyncio is a comprehensive framework for asynchronous programming. It has a huge scope and supports both low-level as well as high-level APIs. It is still relatively young and not well understood by the community.

I’m confident that over time best practices will emerge, and more examples will surface and make it easier to use this powerful library.