2. Threads

2.1. Threads vs processes

2.1.1. Process

  1. Co to jest proces?

  2. Ile czasu trwa tworzenie procesów?

  3. Kto zarządza procesami?

  4. Ile może być równoległych procesów?

  5. Co to jest nice

  6. Jak komunikować się między procesami?

2.1.2. Thread

  1. Co to jest wątek?

  2. Ile czasu trwa tworzenie wątków?

  3. Kto zarządza wątkami?

  4. Ile może być równoległych wątków?

  5. Ile wątków może być w ramach jednego procesu?

  6. Jak komunikować się między wątkami?

  7. Czy współdzielenie pamięci przez wątki jest dobre czy złe?

2.1.3. Threads vs processes

  1. Czym się różnią wątki od procesów?

  2. Ile może być wątków przetwarzanych równolegle na procesorze czterordzeniowym (z i bez Hyper Threading)?

  3. Ile może być procesów przetwarzanych równolegle na procesorze czterordzeniowym (z i bez Hyper Threading)?

  4. Jak na wątki i procesy wpływa GIL?

2.2. Problemy z wątkami

  • Zakleszczania

  • Race Condition

  • Głodzenie

  • Problem 5 filozofów:

    • 5 filozofów (albo rozmyśla, albo je)

    • 5 misek ze spaghetti,

    • 5 widelców,

    • 2 widelce potrzebne aby zjeść,

    • problem zakleszczania

  • Problem producenta i konsumenta

  • Problem czytelników i pisarzy

2.3. threading

2.3.1. daemon flag

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

2.3.2. Delay execution

  • dlaczego nie time.sleep()

  • rekurencyjny timer

Listing 424. Delay execution
from threading import Timer


DELAY_SECONDS = 5.0

def hello():
    print('Hello world!')


t = Timer(DELAY_SECONDS, hello)
t.start()

print('Main Thread')

2.3.3. Recurrent timer

Listing 425. Recurrent timer
from threading import Timer


DELAY_SECONDS = 5.0

def hello():
    print('Timer Thread')
    Timer(DELAY_SECONDS, hello).start()


t = Timer(DELAY_SECONDS, hello)
t.start()

print('Main Thread')

2.4. Tworzenie wątków

from threading import Thread


class MyThread(Thread):
    def run(self):
        print('hello')


t = MyThread()
t.start()

2.5. Synchronizacja wątków

from threading import Thread


class MyThread(Thread):
    def run(self):
        print('hello')


t1 = MyThread()
t1.start()

t2 = MyThread()
t2.start()

t1.join()
t2.join()
from threading import Thread

RUNNING = []


class MyThread(Thread):
    def run(self):
        print('hello')


t1 = MyThread()
t1.start()
RUNNING.append(t1)

t2 = MyThread()
t2.start()
RUNNING.append(t2)

for thread in RUNNING:
    thread.join()
from threading import Thread

RUNNING = []


class MyThread(Thread):
    def run(self):
        print('hello')


def spawn(cls, count=1):
    for i in range(count):
        t = cls()
        t.start()
        RUNNING.append(t)


spawn(MyThread, count=10)


for thread in RUNNING:
    thread.join()

2.6. Zamykanie wątków

Listing 426. Synchronizacja wątków
from queue import Queue
from threading import Thread, Lock
from time import sleep


EXIT = False
LOCK = Lock()
TODO = Queue()
RUNNING = []


class MyThread(Thread):
    def run(self):
        while not EXIT:
            # Remove and return an item from the queue.
            job = TODO.get()

            # Execute work
            print(f'Will do the work: {job}')

            # Indicate that a formerly enqueued task is complete.
            TODO.task_done()
            sleep(1)

        print(f'Exiting {self.name}')


# Create new threads
def spawn_worker(count=1):
    for i in range(count):
        thread = MyThread()
        thread.start()
        RUNNING.append(thread)


if __name__ == '__main__':
    spawn_worker(5)

    # Fill the queue
    with LOCK:
        for task in ['One', 'Two', 'Three', 'Four', 'Five']:
            TODO.put(task)

    # Wait for queue to empty
    while not TODO.empty():
        pass

    # Notify threads it's time to exit
    EXIT = True

    # Wait for all threads to complete
    for thread in RUNNING:
        thread.join()

    print(f'Exiting Main Thread')

2.7. Workery

Listing 427. Model Workerów
from queue import Queue
from threading import Thread

TODO = Queue()


class Worker(Thread):
    def run(self):
        while True:
            # Remove and return an item from the queue.
            job = TODO.get()

            # Execute work
            print(f'Will do the work: {job}')

            # Indicate that a formerly enqueued task is complete.
            TODO.task_done()


def spawn_worker(count=1):
    for i in range(count):
        Worker().start()


if __name__ == '__main__':
    spawn_worker(3)

    TODO.put('ping')
    TODO.put('ls -la')
    TODO.put('echo "hello world"')
    TODO.put('cat /etc/passwd')

    # wait to complete all tasks
    TODO.join()

2.8. Assignments

2.8.1. Wielowątkowość

  1. Stwórz kolejkę queue do której dodasz różne polecenia systemowe do wykonania, np.:

    • Linux/macOS: ['/bin/ls /etc/', '/bin/echo "test"', '/bin/sleep 2'],

    • Windows: ['dir c:\\Users', 'echo "test"', 'type %HOMEPATH%\Desktop\README.txt'].

  2. Następnie przygotuj trzy wątki workerów, które będą wykonywały polecenia z kolejki

  3. Wątki powinny być uruchamiane jako subprocess.run() w systemie operacyjnym z timeoutem równym TIMEOUT = 2.0 sekundy

  4. Ilość poleceń może się zwiększać w miarę wykonywania zadania.

  5. Wątki mają być uruchomione w tle (ang. daemon)

Zadanie z gwiazdką
  1. Wątki powinny być uśpione za pomocą Timer przez DELAY = 5.0 sekund, a następnie ruszyć do roboty

  2. Parametry rozbij za pomocą shlex

  3. Użyj logowania za pomocą biblioteki logging tak aby przy wyświetlaniu wyników widoczny był identyfikator procesu i wątku.

Hint

Ustaw parametr shell=True dla subprocess.run()