7.2. Context Manager

7.2.1. Rationale

  • Files

  • Buffering data

  • Database connection

  • Database transactions

  • Database cursors

  • Locks

  • Network sockets

  • Network streams

  • HTTP sessions

7.2.2. Protocol

  • __enter__(self) -> self

  • __exit__(self, *args) -> None

class ContextManager:
    def __enter__(self):
        return self

    def __exit__(self, *args):
        return None
with ContextManager as cm:
    ...

7.2.3. Example

class MyClass:
    def __enter__(self):
        print('Entering the block')
        return self

    def __exit__(self, *args):
        print('Exiting the block')
        return None

    def do_something(self):
        print('I am inside')


with MyClass() as my:
    my.do_something()

# Entering the block
# I am inside
# Exiting the block
class Rocket:
    def __enter__(self):
        print('Launching')
        return self

    def __exit__(self, *args):
        print('Landing')

    def fly_to_space(self):
        print('I am in space!')


with Rocket() as rocket:
    rocket.fly_to_space()

# Launching
# I am in space!
# Landing

7.2.4. Inheritance

from contextlib import ContextDecorator
from time import time


class Timeit(ContextDecorator):
    def __enter__(self):
        self.start = time()
        return self

    def __exit__(self, *args):
        end = time()
        print(f'Duration {end-self.start:.2f} seconds')


@Timeit()
def myfunction():
    list(range(100_000_000))


myfunction()
# Duration 3.90 seconds

7.2.5. Decorator

  • Split function for before and after yield

  • Code before yield becomes __enter__()

  • Code after yield becomes __exit__()

from contextlib import contextmanager
from time import time


@contextmanager
def timeit():
    start = time()
    yield
    end = time()
    print(f'Duration {end-start:.4f} seconds')


with timeit():
    list(range(100_000_000))

# Duration 4.0250 seconds
from contextlib import contextmanager


@contextmanager
def tag(name):
    print(f'<{name}>')
    yield
    print(f'</{name}>')


with tag('p'):
    print('foo')

# <p>
# foo
# </p>

7.2.6. Use Case - Files

f = open(FILE)

try:
    content = f.read()
finally:
    f.close()
with open(FILE) as f:
    content = f.read()
uint32_max = 4_294_967_295
char* file[uint32_max];

file[0] = '/tmp/myfile1.txt'
file[1] = '/tmp/myfile2.txt'
file[2] = '/tmp/myfile3.txt'
...
file[4_294_967_295] = '/tmp/myfileX.txt'
file[4_294_967_296] -> KernelPanic

7.2.7. Use Case - Database

import sqlite3

SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS astronauts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        firstname TEXT NOT NULL,
        lastname TEXT NOT NULL,
        age INTEGER
    )
"""

SQL_INSERT = """
    INSERT INTO astronauts VALUES(NULL, :firstname, :lastname, :age)
"""

SQL_SELECT = """
    SELECT * FROM astronauts
"""

DATA = [
    {'firstname': 'Jan', 'lastname': 'Twardowski', 'age': 44},
    {'firstname': 'Mark', 'lastname': 'Watney', 'age': 33},
    {'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 36},
]


with sqlite3.connect('/tmp/mydatabase.db') as db:
    db.execute(SQL_CREATE_TABLE)
    db.executemany(SQL_INSERT, DATA)
    db.row_factory = sqlite3.Row
    for row in db.execute(SQL_SELECT):
        print(dict(row))

# {'id': 1, 'firstname': 'Jan', 'lastname': 'Twardowski', 'age': 44}
# {'id': 2, 'firstname': 'Mark', 'lastname': 'Watney', 'age': 33}
# {'id': 3, 'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 36}

7.2.8. Use Case - Lock

from threading import Lock


lock = Lock()
lock.acquire()

try:
    print('Critical section 1')
    print('Critical section 2')
finally:
    lock.release()
from threading import Lock

lock = Lock()

with lock:
    print('Critical section 1')
    print('Critical section 2')

7.2.9. Use Case - Microbenchmark

from time import time


class Timeit:
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        self.start = time()
        return self

    def __exit__(self, *arg):
        end = time()
        print(f'Duration of {self.name} is {end - self.start:.2f} second')


a = 1
b = 2
repetitions = int(1e7)

with Timeit('f-string'):
    for _ in range(repetitions):
        f'{a}{b}'

with Timeit('string concat'):
    for _ in range(repetitions):
        str(a) + str(b)

with Timeit('str.format()'):
    for _ in range(repetitions):
        '{0}{1}'.format(a, b)

with Timeit('str.format()'):
    for _ in range(repetitions):
        '{}{}'.format(a, b)

with Timeit('str.format()'):
    for _ in range(repetitions):
        '{a}{b}'.format(a=a, b=b)

with Timeit('%-style'):
    for _ in range(repetitions):
        '%s%s' % (a, b)

with Timeit('%-style'):
    for _ in range(repetitions):
        '%d%d' % (a, b)

with Timeit('%-style'):
    for _ in range(repetitions):
        '%f%f' % (a, b)

# Duration of f-string is 2.94 second
# Duration of string concat is 5.30 second
# Duration of str.format() is 3.62 second
# Duration of str.format() is 3.48 second
# Duration of str.format() is 5.02 second
# Duration of %-style is 2.60 second
# Duration of %-style is 2.71 second
# Duration of %-style is 4.02 second

7.2.10. Assignments

Code 7.16. Solution
"""
* Assignment: Protocol ContextManager File
* Complexity: easy
* Lines of code: 13 lines
* Time: 13 min

English:
    1. Define class `File` with parameter: `filename: str`
    2. `File` must implement Context Manager protocol
    3. `File` buffers lines added using `File.append(text: str)` method
    4. On `with` block exit `File` class creates, opens and write buffer to file
    5. Run doctests - all must succeed

Polish:
    1. Stwórz klasę `File` z parametrem: `filename: str`
    2. `File` ma implementować protokół Context Manager
    3. `File` buforuje linie dodawane za pomocą metody `File.append(text: str)`
    4. Na wyjściu z bloku `with` klasa `File` tworzy, otwiera i zapisuje bufor do pliku
    5. Uruchom doctesty - wszystkie muszą się powieść

Hints:
    * Append newline character (`\n`) before adding to buffer

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""


Code 7.17. Solution
"""
* Assignment: Protocol ContextManager Buffer
* Complexity: medium
* Lines of code: 21 lines
* Time: 21 min

English:
    1. Define class configuration attribute `BUFFER_LIMIT: int = 100` bytes
    2. File has to be written to disk every X bytes of buffer
    3. Writing and reading takes time, how to make buffer save data in the background, but it could be still used?
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj klasowy atrybut konfiguracyjny `BUFFER_LIMIT: int = 100` bajtów
    2. Plik na dysku ma być zapisywany co X bajtów bufora
    3. Operacje zapisu i odczytu trwają, jak zrobić, aby do bufora podczas zapisu na dysk, nadal można było pisać?
    4. Uruchom doctesty - wszystkie muszą się powieść

Hints:
    * `sys.getsizeof(obj)` returns `obj` size in bytes

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'BUFFER_LIMIT')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.BUFFER_LIMIT == 100

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')
    ...    file.append('Three')
    ...    file.append('Four')
    ...    file.append('Five')
    ...    file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""

from sys import getsizeof


Code 7.18. Solution
"""
* Assignment: Protocol Context Manager AutoSave
* Complexity: hard
* Lines of code: 28 lines
* Time: 21 min

English:
    1. Define class configuration attribute `AUTOSAVE_SECONDS: float = 1.0`
    2. Save buffer content to file every `AUTOSAVE_SECONDS` seconds
    3. Writing and reading takes time, how to make buffer save data in the background, but it could be still used?
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj klasowy atrybut konfiguracyjny `AUTOSAVE_SECONDS: float = 1.0`
    2. Zapisuj zawartość bufora do pliku co `AUTOSAVE_SECONDS` sekund
    3. Operacje zapisu i odczytu trwają, jak zrobić, aby do bufora podczas zapisu na dysk, nadal można było pisać?
    4. Uruchom doctesty - wszystkie muszą się powieść

Hint:
    * `from threading import Timer`
    * `timer = Timer(interval, function)`
    * `timer.start()`
    * `timer.cancel()`
    * `ctrl+c` or stop button kills infinite loop

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'AUTOSAVE_SECONDS')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.AUTOSAVE_SECONDS == 1.0

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')
    ...    file.append('Three')
    ...    file.append('Four')
    ...    file.append('Five')
    ...    file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""

from threading import Timer