6.2. Context Manager

6.2.1. Rationale

  • Files

  • Buffering data

  • Database connection

  • Database transactions

  • Database cursors

  • Locks

  • Network sockets

  • Network streams

  • HTTP sessions

6.2.2. Protocol

  • __enter__(self) -> self

  • __exit__(self, *args) -> None

>>> class ContextManager:
...     def __enter__(self):
...         return self
...
...     def __exit__(self, *args):
...         return None
>>>
>>>
>>> with ContextManager() as cm:
...     print('Do something with `cm`')
Do something with `cm`

6.2.3. Example

>>> class MyClass:
...     def __enter__(self):
...         print('Entering the block')
...         return self
...
...     def __exit__(self, *args):
...         print('Exiting the block')
...         return None
...
...     def do_something(self):
...         print('I am inside')
>>>
>>>
>>> with MyClass() as my:
...     my.do_something()
Entering the block
I am inside
Exiting the block
>>> class Rocket:
...     def __enter__(self):
...         print('Launching')
...         return self
...
...     def __exit__(self, *args):
...         print('Landing')
...
...     def fly_to_space(self):
...         print('I am in space!')
>>>
>>>
>>> with Rocket() as rocket:
...     rocket.fly_to_space()
Launching
I am in space!
Landing

6.2.4. Inheritance

>>> from contextlib import ContextDecorator
>>> from time import time
>>>
>>>
>>> class Timeit(ContextDecorator):
...     def __enter__(self):
...         self.start = time()
...         return self
...
...     def __exit__(self, *args):
...         end = time()
...         print(f'Duration {end-self.start:.2f} seconds')
>>>
>>>
>>> @Timeit()
... def myfunction():
...     list(range(100_000_000))
>>>
>>>
>>> myfunction()  # doctest: +SKIP
Duration 3.90 seconds

6.2.5. Decorator

  • Split function for before and after yield

  • Code before yield becomes __enter__()

  • Code after yield becomes __exit__()

>>> from contextlib import contextmanager
>>> from time import time
>>>
>>>
>>> @contextmanager
... def timeit():
...     start = time()
...     yield
...     end = time()
...     print(f'Duration {end-start:.4f} seconds')
>>>
>>>
>>> with timeit():  # doctest: +SKIP
...     list(range(100_000_000))
>>>
Duration 4.0250 seconds
>>> from contextlib import contextmanager
>>>
>>>
>>> @contextmanager
... def tag(name):
...     print(f'<{name}>')
...     yield
...     print(f'</{name}>')
>>>
>>>
>>> with tag('p'):
...     print('foo')
<p>
foo
</p>

6.2.6. Use Case - Files

>>> f = open('/tmp/myfile.txt')
>>>
>>> try:
...     content = f.read()
... finally:
...     f.close()
>>> with open('/tmp/myfile.txt') as f:
...     content = f.read()
uint32_max = 4_294_967_295
char* file[uint32_max];

file[0] = '/tmp/myfile1.txt'
file[1] = '/tmp/myfile2.txt'
file[2] = '/tmp/myfile3.txt'
...
file[4_294_967_295] = '/tmp/myfileX.txt'
file[4_294_967_296] -> KernelPanic

6.2.7. Use Case - Database

>>> import sqlite3
>>>
>>>
>>> DATABASE = ':memory:'
>>>
>>> SQL_CREATE_TABLE = """
...     CREATE TABLE IF NOT EXISTS astronauts (
...         id INTEGER PRIMARY KEY AUTOINCREMENT,
...         firstname TEXT NOT NULL,
...         lastname TEXT NOT NULL,
...         age INTEGER
...     )
... """
>>>
>>> SQL_INSERT = """
...     INSERT INTO astronauts VALUES(NULL, :firstname, :lastname, :age)
... """
>>>
>>> SQL_SELECT = """
...     SELECT * FROM astronauts
... """
>>>
>>> DATA = [{'firstname': 'Jan', 'lastname': 'Twardowski', 'age': 44},
...         {'firstname': 'Mark', 'lastname': 'Watney', 'age': 33},
...         {'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 36}]
>>>
>>>
>>> with sqlite3.connect(DATABASE) as db:  # doctest: +ELLIPSIS
...     db.execute(SQL_CREATE_TABLE)
...     db.executemany(SQL_INSERT, DATA)
...     db.row_factory = sqlite3.Row
...
...     for row in db.execute(SQL_SELECT):
...         print(dict(row))
<sqlite3.Cursor object at 0x...>
<sqlite3.Cursor object at 0x...>
{'id': 1, 'firstname': 'Jan', 'lastname': 'Twardowski', 'age': 44}
{'id': 2, 'firstname': 'Mark', 'lastname': 'Watney', 'age': 33}
{'id': 3, 'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 36}

6.2.8. Use Case - Lock

>>> from threading import Lock
>>>
>>>
>>> lock = Lock()
>>> lock.acquire()
True
>>>
>>> try:
...     print('Critical section 1')
...     print('Critical section 2')
... finally:
...     lock.release()
Critical section 1
Critical section 2
>>> from threading import Lock
>>>
>>>
>>> lock = Lock()
>>>
>>> with lock:
...     print('Critical section 1')
...     print('Critical section 2')
Critical section 1
Critical section 2

6.2.9. Use Case - Microbenchmark

>>> # doctest: +SKIP
... from time import time
...
...
... class Timeit:
...     def __init__(self, name):
...         self.name = name
...
...     def __enter__(self):
...         self.start = time()
...         return self
...
...     def __exit__(self, *arg):
...         end = time()
...         print(f'Duration of {self.name} is {end - self.start:.2f} second')
...
...
... a = 1
... b = 2
... repetitions = int(1e7)
...
... with Timeit('f-string'):
...     for _ in range(repetitions):
...         f'{a}{b}'
...
... with Timeit('string concat'):
...     for _ in range(repetitions):
...         str(a) + str(b)
...
... with Timeit('str.format()'):
...     for _ in range(repetitions):
...         '{0}{1}'.format(a, b)
...
... with Timeit('str.format()'):
...     for _ in range(repetitions):
...         '{}{}'.format(a, b)
...
... with Timeit('str.format()'):
...     for _ in range(repetitions):
...         '{a}{b}'.format(a=a, b=b)
...
... with Timeit('%-style'):
...     for _ in range(repetitions):
...         '%s%s' % (a, b)
...
... with Timeit('%-style'):
...     for _ in range(repetitions):
...         '%d%d' % (a, b)
...
... with Timeit('%-style'):
...     for _ in range(repetitions):
...         '%f%f' % (a, b)
...
... Duration of f-string is 2.94 second
... Duration of string concat is 5.30 second
... Duration of str.format() is 3.62 second
... Duration of str.format() is 3.48 second
... Duration of str.format() is 5.02 second
... Duration of %-style is 2.60 second
... Duration of %-style is 2.71 second
... Duration of %-style is 4.02 second

6.2.10. Assignments

Code 6.43. Solution
"""
* Assignment: Protocol ContextManager File
* Complexity: easy
* Lines of code: 13 lines
* Time: 8 min

English:
    1. Define class `File` with parameter: `filename: str`
    2. `File` must implement Context Manager protocol
    3. `File` buffers lines added using `File.append(text: str)` method
    4. On `with` block exit, `File` class:
        a. Creates file (if not exists)
        b. Opens file
        c. Writes buffer to file
        d. Clears buffer
        e. Closes file
    5. Run doctests - all must succeed

Polish:
    1. Stwórz klasę `File` z parametrem: `filename: str`
    2. `File` ma implementować protokół Context Manager
    3. `File` buforuje linie dodawane za pomocą metody `File.append(text: str)`
    4. Na wyjściu z bloku `with`, klasa `File`:
        a. Tworzy plik (jeżeli nie istnieje)
        b. Otwiera plik
        c. Zapisuje bufor do pliku
        d. Czyści bufor
        e. Zamyka plik
    5. Uruchom doctesty - wszystkie muszą się powieść

Hints:
    * Append newline character (`\n`) before adding to buffer

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""


Code 6.44. Solution
"""
* Assignment: Protocol ContextManager Buffer
* Complexity: medium
* Lines of code: 21 lines
* Time: 13 min

English:
    1. Define class attribute `BUFFER_LIMIT: int = 100` bytes
    2. File has to be written to disk every X bytes of buffer
    3. Writing and reading takes time,
       how to make buffer save data in the background,
       but it could be still used?
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj klasowy atrybut `BUFFER_LIMIT: int = 100` bajtów
    2. Plik na dysku ma być zapisywany co X bajtów bufora
    3. Operacje zapisu i odczytu trwają, jak zrobić,
       aby do bufora podczas zapisu na dysk,
       nadal można było pisać?
    4. Uruchom doctesty - wszystkie muszą się powieść

Hints:
    * `sys.getsizeof(obj)` returns `obj` size in bytes

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'BUFFER_LIMIT')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.BUFFER_LIMIT == 100

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')
    ...    file.append('Three')
    ...    file.append('Four')
    ...    file.append('Five')
    ...    file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""

from sys import getsizeof


Code 6.45. Solution
"""
* Assignment: Protocol Context Manager AutoSave
* Complexity: hard
* Lines of code: 28 lines
* Time: 21 min

English:
    1. Define class configuration attribute `AUTOSAVE_SECONDS: float = 1.0`
    2. Save buffer content to file every `AUTOSAVE_SECONDS` seconds
    3. Writing and reading takes time,
       how to make buffer save data in the background,
       but it could be still used?
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj klasowy atrybut konfiguracyjny `AUTOSAVE_SECONDS: float = 1.0`
    2. Zapisuj zawartość bufora do pliku co `AUTOSAVE_SECONDS` sekund
    3. Operacje zapisu i odczytu trwają, jak zrobić,
       aby do bufora podczas zapisu na dysk,
       nadal można było pisać?
    4. Uruchom doctesty - wszystkie muszą się powieść

Hint:
    * `from threading import Timer`
    * `timer = Timer(interval, function)`
    * `timer.start()`
    * `timer.cancel()`
    * `ctrl+c` or stop button kills infinite loop

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'AUTOSAVE_SECONDS')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.AUTOSAVE_SECONDS == 1.0

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')
    ...    file.append('Three')
    ...    file.append('Four')
    ...    file.append('Five')
    ...    file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""

from threading import Timer