13.6. AsyncIO Run

  • asyncio.run() is a main entrypoint

  • asyncio.gather() can run concurrently and gather result (in order of its arguments)

13.6.1. SetUp

>>> import asyncio

13.6.2. Run Coroutine

  • asyncio.run(coro, *, debug=False)

  • Execute the coroutine coro and return the result

  • Takes care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool

  • Cannot be called when another asyncio event loop is running in the same thread

  • Always creates a new event loop and closes it at the end

  • It should be used as a main entry point for asyncio programs, and should ideally only be called once

>>> async def hello():
...     print('hello')
>>>
>>>
>>> asyncio.run(hello())
hello

13.6.3. Run Sequentially

>>> async def hello():
...     print('hello')
>>>
>>>
>>> async def main():
...     await hello()
...     await hello()
...     await hello()
>>>
>>>
>>> asyncio.run(main())
hello
hello
hello

13.6.4. Run Concurrently

  • awaitable asyncio.gather(*aws, return_exceptions=False)

  • Run awaitable objects in the aws sequence concurrently

  • If any awaitable in aws is a coroutine, it is automatically scheduled as a Task

  • If all awaitables are completed successfully, the result is an aggregate list of returned values

  • The order of result values corresponds to the order of awaitables in aws

  • return_exceptions=False (default): the first raised exception is immediately propagated to the task that awaits on gather(); other awaitables in the aws sequence won't be cancelled and will continue to run

  • return_exceptions=True: exceptions are treated the same as successful results, and aggregated in the result list

  • If gather() is cancelled (ie. on timeout), all submitted awaitables (that have not completed yet) are also cancelled

  • If any Task or Future from the aws sequence is cancelled, it is treated as if it raised CancelledError – the gather() call is not cancelled in this case

  • This is to prevent the cancellation of one submitted Task/Future to cause other Tasks/Futures to be cancelled

>>> async def a():
...     print('a: started')
...     await asyncio.sleep(0.2)
...     print('a: finished')
...     return 'a'
>>>
>>> async def b():
...     print('b: started')
...     await asyncio.sleep(0.1)
...     print('b: finished')
...     return 'b'
>>>
>>> async def c():
...     print('c: started')
...     await asyncio.sleep(0.3)
...     print('c: finished')
...     return 'c'
>>>
>>>
>>> async def main():
...     result = await asyncio.gather(a(), b(), c())
...     print(f'Result: {result}')
>>>
>>>
>>> asyncio.run(main())
a: started
b: started
c: started
b: finished
a: finished
c: finished
Result: ['a', 'b', 'c']

13.6.5. Run as Completed

  • asyncio.as_completed(aws, *, timeout=None)

  • Run awaitable objects in the aws iterable concurrently

  • Return an iterator of coroutines

  • Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables

  • Raises asyncio.TimeoutError if the timeout occurs before all Futures are done

>>> async def a():
...     print('a: started')
...     await asyncio.sleep(0.2)
...     print('a: finished')
...     return 'a'
>>>
>>> async def b():
...     print('b: started')
...     await asyncio.sleep(0.1)
...     print('b: finished')
...     return 'b'
>>>
>>> async def c():
...     print('c: started')
...     await asyncio.sleep(0.3)
...     print('c: finished')
...     return 'c'
>>>
>>>
>>> async def main():
...     todo = [a(), b(), c()]
...     for coro in asyncio.as_completed(todo):
...         result = await coro
...         print(result)
>>>
>>>
>>> asyncio.run(main())  
a: started
c: started
b: started
b: finished
b
a: finished
a
c: finished
c

13.6.6. Run in Threads

  • coroutine asyncio.to_thread(func, /, *args, **kwargs)

  • Asynchronously run function func in a separate thread.

  • Any *args and **kwargs supplied for this function are directly passed to func.

  • Return a coroutine that can be awaited to get the eventual result of func.

  • This coroutine function is intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were ran in the main thread.

>>> import asyncio
>>> import time
>>>
>>>
>>> def work():
...     print(f'Work started {time.strftime("%X")}')
...     time.sleep(2)  # Blocking
...     print(f'Work done at {time.strftime("%X")}')
>>>
>>>
>>> async def main():
...     print(f'Started main at {time.strftime("%X")}')
...     await asyncio.gather(
...         asyncio.to_thread(work),
...         asyncio.sleep(1))
...     print(f'Finished main at {time.strftime("%X")}')
>>>
>>>
>>> asyncio.run(main())  
Started main at 22:53:40
Work started 22:53:40
Work done at 22:53:42
Finished main at 22:53:42

Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don't have one, asyncio.to_thread() can also be used for CPU-bound functions.

13.6.7. Assignments

Code 13.35. Solution
"""
* Assignment: OOP Async GatherMany
* Complexity: easy
* Lines of code: 2 lines
* Time: 3 min

English:
    1. Define:
        a. coroutine function `main()`
    2. After running coroutine should:
        a. execute coroutines a(), b() and c()
        b. gather their returned values
        c. return results
    3. Run doctests - all must succeed

Polish:
    1. Zdefiniuj:
        a. coroutine function `main()`
    2. Po uruchomieniu coroutine powinna:
        a. wykonać korutyny a(), b() i c()
        b. zebrać ich zwracane wartości
        c. zwrócić wyniki
    3. Uruchom doctesty - wszystkie muszą się powieść

    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import iscoroutine, iscoroutinefunction
    >>> import asyncio

    >>> assert iscoroutinefunction(a)
    >>> assert iscoroutinefunction(b)
    >>> assert iscoroutinefunction(c)
    >>> assert iscoroutine(a())
    >>> assert iscoroutine(b())
    >>> assert iscoroutine(c())

    >>> assert iscoroutinefunction(main)
    >>> assert iscoroutine(main())

    >>> asyncio.run(main())
    a: before
    b: before
    c: before
    b: after
    a: after
    c: after
    ['a', 'b', 'c']
"""

import asyncio


async def a():
    print('a: before')
    await asyncio.sleep(1.0)
    print('a: after')
    return 'a'

async def b():
    print('b: before')
    await asyncio.sleep(0.5)
    print('b: after')
    return 'b'

async def c():
    print('c: before')
    await asyncio.sleep(1.5)
    print('c: after')
    return 'c'


# coroutine function `main()`
# execute coroutines a(), b() and c(); return gathered results
# type: Coroutine
def main():
    ...


Code 13.36. Solution
"""
* Assignment: OOP Async GatherParams
* Complexity: easy
* Lines of code: 9 lines
* Time: 5 min

English:
    1. Define:
        a. coroutine function `run()`
        a. coroutine function `main()`
    2. Coroutine `main()` should schedule `run()` 3 times with parameters:
        a. First: name=a, sleep=1.0
        b. Second: name=b, sleep=0.5
        c. Third: name=c, sleep=1.5
    3. Coroutine `main()` should return gathered results
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj:
        a. coroutine function `run()`
        a. coroutine function `main()`
    2. Korutyna `main()` powinna zaschedulować `run()` 3 razy z parametrami:
        a. Pierwsze: name=a, sleep=1.0
        b. Drugie: name=b, sleep=0.5
        c. Trzecie: name=c, sleep=1.5
    3. Uruchom doctesty - wszystkie muszą się powieść

    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import iscoroutine, iscoroutinefunction
    >>> import asyncio

    >>> assert iscoroutinefunction(run)
    >>> assert iscoroutine(run(None,0))

    >>> assert iscoroutinefunction(main)
    >>> assert iscoroutine(main())

    >>> asyncio.run(main())
    ['a', 'b', 'c']
"""

import asyncio


# type: Coroutine
def run():
    ...


# coroutine function `main()`
# type: Coroutine
def main():
    ...


Code 13.37. Solution
"""
* Assignment: OOP Async Fetch
* Complexity: easy
* Lines of code: 7 lines
* Time: 8 min

English:
    1. Define:
        a. coroutine function `check()`
        a. coroutine function `main()`
    2. Coroutine `check()` should use coroutine `fetch()` to download html
    3. Coroutine `check()` should check if string 'CC-BY-SA' is in html
    4. Coroutine `main()` should schedule `check()` for each URL in DATA
    5. Coroutine `main()` should return gathered results as list[dict]:
       [{'url': 'https://python3.info', 'license': True},
        {'url': 'https://python3.info/index.html', 'license': True},
        {'url': 'https://python3.info/about.html', 'license': False},
        {'url': 'https://python3.info/LICENSE.html', 'license': True}]
    6. Run doctests - all must succeed

Polish:
    1. Zdefiniuj:
        a. coroutine function `check()`
        a. coroutine function `main()`
    2. Korutyna `check` powinna użyć korutyny `fetch()` aby ściągnąć html
    3. Korutyna `check()` powinna sprawdzać czy string 'CC-BY-SA' jest w htmlu
    4. Korutyna `main()` powinna zaschedulować `check()` dla każdego URL w DATA
    5. Korutyna `main()` powinna zwrócić zebrane wyniki jako list[dict]:
       [{'url': 'https://python3.info', 'license': True},
        {'url': 'https://python3.info/index.html', 'license': True},
        {'url': 'https://python3.info/about.html', 'license': False},
        {'url': 'https://python3.info/LICENSE.html', 'license': True}]

    6. Uruchom doctesty - wszystkie muszą się powieść

    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import iscoroutine, iscoroutinefunction
    >>> import asyncio

    >>> assert iscoroutinefunction(fetch)
    >>> assert iscoroutine(fetch(''))

    >>> assert iscoroutinefunction(check)
    >>> assert iscoroutine(check(''))

    >>> assert iscoroutinefunction(main)
    >>> assert iscoroutine(main())

    >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
    [{'url': 'https://python3.info', 'license': True},
     {'url': 'https://python3.info/index.html', 'license': True},
     {'url': 'https://python3.info/about.html', 'license': False},
     {'url': 'https://python3.info/LICENSE.html', 'license': True}]
"""

import asyncio
from httpx import AsyncClient


DATA = [
    'https://python3.info',
    'https://python3.info/index.html',
    'https://python3.info/about.html',
    'https://python3.info/LICENSE.html',
]


async def fetch(url):
    return await AsyncClient().get(url)