13.6. AsyncIO Run

  • asyncio.run() is a main entrypoint

  • asyncio.gather() can run concurrently and gather result (in order of its arguments)

In Python, asyncio is a module that provides tools for writing asynchronous code using coroutines. The asyncio.run() function is a utility function that is used to run the main entry point of an asyncio program.

The asyncio.run() function was introduced in Python 3.7 as a simple way to run an asyncio program. It creates an event loop, runs the coroutine passed to it, and then closes the event loop.

Here is an example of how to use asyncio.run():

>>> import asyncio
>>>
>>> async def my_coroutine():
...     await asyncio.sleep(5)
...     print("Task complete!")
>>>
>>> async def main():
...     await my_coroutine()
>>>
>>> asyncio.run(main())
Task complete!

In this example, the my_coroutine function is a coroutine that sleeps for 5 seconds and then prints "Task complete!". The main function calls my_coroutine using the await keyword. Finally, the asyncio.run() function is used to run the main coroutine.

When asyncio.run() is called, it creates an event loop, runs the main coroutine, and then closes the event loop. In this case, the my_coroutine function will be executed, causing the program to sleep for 5 seconds before printing "Task complete!".

The asyncio.run() function is a convenient way to run an asyncio program without having to manually create and manage an event loop. It simplifies the process of writing asynchronous code in Python.

13.6.1. SetUp

>>> import asyncio

13.6.2. Run Coroutine

  • asyncio.run(coro, *, debug=False)

  • Execute the coroutine coro and return the result

  • Takes care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool

  • Cannot be called when another asyncio event loop is running in the same thread

  • Always creates a new event loop and closes it at the end

  • It should be used as a main entry point for asyncio programs, and should ideally only be called once

>>> async def hello():
...     print('hello')
>>>
>>>
>>> asyncio.run(hello())
hello

13.6.3. Run Sequentially

>>> async def hello():
...     print('hello')
>>>
>>>
>>> async def main():
...     await hello()
...     await hello()
...     await hello()
>>>
>>>
>>> asyncio.run(main())
hello
hello
hello

13.6.4. Run Concurrently

  • awaitable asyncio.gather(*aws, return_exceptions=False)

  • Run awaitable objects in the aws sequence concurrently

  • If any awaitable in aws is a coroutine, it is automatically scheduled as a Task

  • If all awaitables are completed successfully, the result is an aggregate list of returned values

  • The order of result values corresponds to the order of awaitables in aws

  • return_exceptions=False (default): the first raised exception is immediately propagated to the task that awaits on gather(); other awaitables in the aws sequence won't be cancelled and will continue to run

  • return_exceptions=True: exceptions are treated the same as successful results, and aggregated in the result list

  • If gather() is cancelled (ie. on timeout), all submitted awaitables (that have not completed yet) are also cancelled

  • If any Task or Future from the aws sequence is cancelled, it is treated as if it raised CancelledError – the gather() call is not cancelled in this case

  • This is to prevent the cancellation of one submitted Task/Future to cause other Tasks/Futures to be cancelled

>>> async def a():
...     print('a: started')
...     await asyncio.sleep(0.2)
...     print('a: finished')
...     return 'a'
>>>
>>> async def b():
...     print('b: started')
...     await asyncio.sleep(0.1)
...     print('b: finished')
...     return 'b'
>>>
>>> async def c():
...     print('c: started')
...     await asyncio.sleep(0.3)
...     print('c: finished')
...     return 'c'
>>>
>>>
>>> async def main():
...     result = await asyncio.gather(a(), b(), c())
...     print(f'Result: {result}')
>>>
>>>
>>> asyncio.run(main())
a: started
b: started
c: started
b: finished
a: finished
c: finished
Result: ['a', 'b', 'c']

13.6.5. Run as Completed

  • asyncio.as_completed(aws, *, timeout=None)

  • Run awaitable objects in the aws iterable concurrently

  • Return an iterator of coroutines

  • Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables

  • Raises asyncio.TimeoutError if the timeout occurs before all Futures are done

>>> async def a():
...     print('a: started')
...     await asyncio.sleep(0.2)
...     print('a: finished')
...     return 'a'
>>>
>>> async def b():
...     print('b: started')
...     await asyncio.sleep(0.1)
...     print('b: finished')
...     return 'b'
>>>
>>> async def c():
...     print('c: started')
...     await asyncio.sleep(0.3)
...     print('c: finished')
...     return 'c'
>>>
>>>
>>> async def main():
...     todo = [a(), b(), c()]
...     for coro in asyncio.as_completed(todo):
...         result = await coro
...         print(result)
>>>
>>>
>>> asyncio.run(main())  
a: started
c: started
b: started
b: finished
b
a: finished
a
c: finished
c

13.6.6. Run in Threads

  • coroutine asyncio.to_thread(func, /, *args, **kwargs)

  • Asynchronously run function func in a separate thread.

  • Any *args and **kwargs supplied for this function are directly passed to func.

  • Return a coroutine that can be awaited to get the eventual result of func.

  • This coroutine function is intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were ran in the main thread.

>>> import asyncio
>>> import time
>>>
>>>
>>> def work():
...     print(f'Work started {time.strftime("%X")}')
...     time.sleep(2)  # Blocking
...     print(f'Work done at {time.strftime("%X")}')
>>>
>>>
>>> async def main():
...     print(f'Started main at {time.strftime("%X")}')
...     await asyncio.gather(
...         asyncio.to_thread(work),
...         asyncio.sleep(1))
...     print(f'Finished main at {time.strftime("%X")}')
>>>
>>>
>>> asyncio.run(main())  
Started main at 22:53:40
Work started 22:53:40
Work done at 22:53:42
Finished main at 22:53:42

Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don't have one, asyncio.to_thread() can also be used for CPU-bound functions.

13.6.7. Assignments

Code 13.17. Solution
"""
* Assignment: OOP Async GatherMany
* Complexity: easy
* Lines of code: 2 lines
* Time: 3 min

English:
    1. Define:
        a. coroutine function `main()`
    2. After running coroutine should:
        a. execute coroutines a(), b() and c()
        b. gather their returned values
        c. return results
    3. Run doctests - all must succeed

Polish:
    1. Zdefiniuj:
        a. coroutine function `main()`
    2. Po uruchomieniu coroutine powinna:
        a. wykonać korutyny a(), b() i c()
        b. zebrać ich zwracane wartości
        c. zwrócić wyniki
    3. Uruchom doctesty - wszystkie muszą się powieść

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import iscoroutine, iscoroutinefunction
    >>> import asyncio

    >>> assert iscoroutinefunction(a)
    >>> assert iscoroutinefunction(b)
    >>> assert iscoroutinefunction(c)
    >>> assert iscoroutine(a())
    >>> assert iscoroutine(b())
    >>> assert iscoroutine(c())

    >>> assert iscoroutinefunction(main)
    >>> assert iscoroutine(main())

    >>> asyncio.run(main())
    a: before
    b: before
    c: before
    b: after
    a: after
    c: after
    ['a', 'b', 'c']
"""

import asyncio


async def a():
    print('a: before')
    await asyncio.sleep(1.0)
    print('a: after')
    return 'a'

async def b():
    print('b: before')
    await asyncio.sleep(0.5)
    print('b: after')
    return 'b'

async def c():
    print('c: before')
    await asyncio.sleep(1.5)
    print('c: after')
    return 'c'


# coroutine function `main()`
# execute coroutines a(), b() and c(); return gathered results
# type: Coroutine
def main():
    ...


Code 13.18. Solution
"""
* Assignment: OOP Async GatherParams
* Complexity: easy
* Lines of code: 9 lines
* Time: 5 min

English:
    1. Define:
        a. coroutine function `run()`
        a. coroutine function `main()`
    2. Coroutine `main()` should schedule `run()` 3 times with parameters:
        a. First: name=a, sleep=1.0
        b. Second: name=b, sleep=0.5
        c. Third: name=c, sleep=1.5
    3. Coroutine `main()` should return gathered results
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj:
        a. coroutine function `run()`
        a. coroutine function `main()`
    2. Korutyna `main()` powinna zaschedulować `run()` 3 razy z parametrami:
        a. Pierwsze: name=a, sleep=1.0
        b. Drugie: name=b, sleep=0.5
        c. Trzecie: name=c, sleep=1.5
    3. Uruchom doctesty - wszystkie muszą się powieść

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import iscoroutine, iscoroutinefunction
    >>> import asyncio

    >>> assert iscoroutinefunction(run)
    >>> assert iscoroutine(run(None,0))

    >>> assert iscoroutinefunction(main)
    >>> assert iscoroutine(main())

    >>> asyncio.run(main())
    ['a', 'b', 'c']
"""

import asyncio


# type: Coroutine
def run():
    ...


# coroutine function `main()`
# type: Coroutine
def main():
    ...


Code 13.19. Solution
"""
* Assignment: OOP Async Fetch
* Complexity: easy
* Lines of code: 7 lines
* Time: 8 min

English:
    1. Define:
        a. coroutine function `check()`
        a. coroutine function `main()`
    2. Coroutine `check()` should use coroutine `fetch()` to download html
    3. Coroutine `check()` should check if string 'CC-BY-SA' is in html
    4. Coroutine `main()` should schedule `check()` for each URL in DATA
    5. Coroutine `main()` should return gathered results as list[dict]:
       [{'url': 'https://python3.info', 'license': True},
        {'url': 'https://python3.info/index.html', 'license': True},
        {'url': 'https://python3.info/about.html', 'license': False},
        {'url': 'https://python3.info/LICENSE.html', 'license': True}]
    6. Run doctests - all must succeed

Polish:
    1. Zdefiniuj:
        a. coroutine function `check()`
        a. coroutine function `main()`
    2. Korutyna `check` powinna użyć korutyny `fetch()` aby ściągnąć html
    3. Korutyna `check()` powinna sprawdzać czy string 'CC-BY-SA' jest w htmlu
    4. Korutyna `main()` powinna zaschedulować `check()` dla każdego URL w DATA
    5. Korutyna `main()` powinna zwrócić zebrane wyniki jako list[dict]:
       [{'url': 'https://python3.info', 'license': True},
        {'url': 'https://python3.info/index.html', 'license': True},
        {'url': 'https://python3.info/about.html', 'license': False},
        {'url': 'https://python3.info/LICENSE.html', 'license': True}]

    6. Uruchom doctesty - wszystkie muszą się powieść

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import iscoroutine, iscoroutinefunction
    >>> import asyncio

    >>> assert iscoroutinefunction(fetch)
    >>> assert iscoroutine(fetch(''))

    >>> assert iscoroutinefunction(check)
    >>> assert iscoroutine(check(''))

    >>> assert iscoroutinefunction(main)
    >>> assert iscoroutine(main())

    >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
    [{'url': 'https://python3.info', 'license': True},
     {'url': 'https://python3.info/index.html', 'license': True},
     {'url': 'https://python3.info/about.html', 'license': False},
     {'url': 'https://python3.info/LICENSE.html', 'license': True}]
"""

import asyncio
from httpx import AsyncClient


DATA = [
    'https://python3.info',
    'https://python3.info/index.html',
    'https://python3.info/about.html',
    'https://python3.info/LICENSE.html',
]


async def fetch(url):
    return await AsyncClient().get(url)