9.3. Threading

thread
lock
daemon
worker
timer

9.3.1. Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

9.3.2. Delay execution

  • dlaczego nie time.sleep()

  • rekurencyjny timer

Delay execution:

from threading import Timer


DELAY_SECONDS = 5.0

def hello():
    print('Hello world!')


t = Timer(DELAY_SECONDS, hello)
t.start()

print('Main Thread')

Recurrent timer:

from threading import Timer


DELAY_SECONDS = 5.0

def hello():
    print('Timer Thread')
    Timer(DELAY_SECONDS, hello).start()


t = Timer(DELAY_SECONDS, hello)
t.start()

print('Main Thread')

9.3.3. Creating Threads

from threading import Thread


class MyThread(Thread):
    def run(self):
        print('hello')


t = MyThread()
t.start()

9.3.4. Thread Synchronisation

from threading import Thread


class MyThread(Thread):
    def run(self):
        print('hello')


t1 = MyThread()
t1.start()

t2 = MyThread()
t2.start()

t1.join()
t2.join()
from threading import Thread

RUNNING = []


class MyThread(Thread):
    def run(self):
        print('hello')


t1 = MyThread()
t1.start()
RUNNING.append(t1)

t2 = MyThread()
t2.start()
RUNNING.append(t2)

for thread in RUNNING:
    thread.join()
from threading import Thread

RUNNING = []


class MyThread(Thread):
    def run(self):
        print('hello')


def spawn(cls, count=1):
    for i in range(count):
        t = cls()
        t.start()
        RUNNING.append(t)


spawn(MyThread, count=10)


for thread in RUNNING:
    thread.join()

9.3.5. Joining Threads

Joining Threads:

from queue import Queue
from threading import Thread, Lock
from time import sleep


EXIT = False
LOCK = Lock()
TODO = Queue()
RUNNING = []


class MyThread(Thread):
    def run(self):
        while not EXIT:
            # Remove and return an item from the queue.
            job = TODO.get()

            # Execute work
            print(f'Will do the work: {job}')

            # Indicate that a formerly enqueued task is complete.
            TODO.task_done()
            sleep(1)

        print(f'Exiting {self.name}')


# Create new threads
def spawn_worker(count=1):
    for i in range(count):
        thread = MyThread()
        thread.start()
        RUNNING.append(thread)


if __name__ == '__main__':
    spawn_worker(5)

    # Fill the queue
    with LOCK:
        for task in ['One', 'Two', 'Three', 'Four', 'Five']:
            TODO.put(task)

    # Wait for queue to empty
    while not TODO.empty():
        pass

    # Notify threads it's time to exit
    EXIT = True

    # Wait for all threads to complete
    for thread in RUNNING:
        thread.join()

    print(f'Exiting Main Thread')

9.3.6. Workers

Worker model:

from queue import Queue
from threading import Thread

TODO = Queue()


class Worker(Thread):
    def run(self):
        while True:
            # Remove and return an item from the queue.
            job = TODO.get()

            # Execute work
            print(f'Will do the work: {job}')

            # Indicate that a formerly enqueued task is complete.
            TODO.task_done()


def spawn_worker(count=1):
    for i in range(count):
        Worker().start()


if __name__ == '__main__':
    spawn_worker(3)

    TODO.put('ping')
    TODO.put('ls -la')
    TODO.put('echo "hello world"')
    TODO.put('cat /etc/passwd')

    # wait to complete all tasks
    TODO.join()

9.3.7. Assignments

Code 9.29. Solution
"""
* Assignment: Concurrency Threading Timer
* Complexity: easy
* Lines of code: 4 lines
* Time: 8 min

English:
    1. Define function `ping()`, with optional parameter
       `n: int`, which defaults to 1
    2. Function `ping()` should append value of `n` to `result`
    3. Function should be called every `INTERVAL`
    4. Function should be called maximum `MAX` times
    5. Use `Timer` from `threading` module
    6. Run doctests - all must succeed

Polish:
    1. Zdefiniuj funkcję `ping(n: int)` z opcjonalnym parametrem
       `n: int`, który domyślnie jest 1
    2. Funkcja `ping()` powinna dopisywać wartość `n` do `result`
    3. Funkcja powinna być wywoływana co `INTERVAL`
    4. Funkcja powinna być wywołana maksymalnie `MAX` razy
    5. Użyj `Timer` z modułu `threading`
    6. Uruchom doctesty - wszystkie muszą się powieść

Tests:
    >>> import sys; sys.tracebacklimit = 0

    >>> def check(result):
    ...     assert result == [1, 2, 3], f'Result is {result}'

    >>> Timer(INTERVAL, ping).start()
    >>> Timer(INTERVAL*MAX+1, check, [result]).start()
"""
from threading import Timer


INTERVAL = 1.0
MAX = 3
result = []


Code 9.30. Solution
"""
* Assignment: Threading Timer File
* Complexity: medium
* Lines of code: 13 lines
* Time: 13 min

English:
    1. Modify class `File`
    2. Add class configuration attribute `AUTOSAVE_SECONDS: float = 1.0`
    3. Save buffer content to file every `AUTOSAVE_SECONDS` seconds
    4. Writing and reading takes time, how to make buffer save data in the background, but it could be still used?
    5. Run doctests - all must succeed

Polish:
    1. Zmodyfikuj klasę `File`
    2. Dodaj klasowy atrybut konfiguracyjny `AUTOSAVE_SECONDS: float = 1.0`
    3. Zapisuj zawartość bufora do pliku co `AUTOSAVE_SECONDS` sekund
    4. Operacje zapisu i odczytu trwają, jak zrobić, aby do bufora podczas zapisu na dysk, nadal można było pisać?
    5. Uruchom doctesty - wszystkie muszą się powieść

Hint:
    * `from threading import Timer`
    * `timer = Timer(interval, function)`
    * `timer.start()`
    * `timer.cancel()`
    * `ctrl+c` or stop button kills infinite loop

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'AUTOSAVE_SECONDS')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.AUTOSAVE_SECONDS == 1.0

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')
    ...    file.append('Three')
    ...    file.append('Four')
    ...    file.append('Five')
    ...    file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'
    >>> from os import remove
    >>> remove('_temporary.txt')
"""

from threading import Timer


class File:
    filename: str
    _content: list[str]

    def __init__(self, filename):
        self.filename = filename
        self._content = list()

    def __enter__(self):
        return self

    def __exit__(self, *args):
        with open(self.filename, mode='w') as file:
            file.writelines(self._content)

    def append(self, line):
        self._content.append(line + '\n')


Code 9.31. Solution
"""
* Assignment: Concurrency Threading Subprocess
* Complexity: easy
* Lines of code: 20 lines
* Time: 21 min

English:
    TODO: English Translation
    X. Run doctests - all must succeed

Polish:
    1. Stwórz kolejkę `queue` do której dodasz różne polecenia systemowe do wykonania, np.:
        a. Linux/macOS: `['/bin/ls /etc/', '/bin/echo "test"', '/bin/sleep 2']`,
        b. Windows: `['dir c:\\Users', 'echo "test"', 'type %HOMEPATH%\Desktop\README.txt']`.
    2. Następnie przygotuj trzy wątki workerów, które będą wykonywały polecenia z kolejki
    3. Wątki powinny być uruchamiane jako `subprocess.run()` w systemie operacyjnym z timeoutem równym `TIMEOUT = 2.0` sekundy
    4. Ilość poleceń może się zwiększać w miarę wykonywania zadania.
    5. Wątki mają być uruchomione w tle (ang. `daemon`)
    6. Uruchom doctesty - wszystkie muszą się powieść

:Extra task:
    1. Wątki powinny być uśpione za pomocą `Timer` przez `DELAY = 5.0` sekund, a następnie ruszyć do roboty
    2. Parametry rozbij za pomocą `shlex`
    3. Użyj logowania za pomocą biblioteki `logging` tak aby przy wyświetlaniu wyników widoczny był identyfikator procesu i wątku.

Hints:
    * Ustaw parametr `shell=True` dla `subprocess.run()`

Tests:
    >>> import sys; sys.tracebacklimit = 0

    TODO: Doctests
"""

TIMEOUT = 2.0
DELAY = 5.0
TODO = ['ping python.astrotech.io',
        'ls -la',
        'echo "hello world"',
        'cat /etc/passwd']