2. Threads

2.1. Threads vs processes

2.1.1. Process

  1. Co to jest proces?

  2. Ile może być procesów?

  3. Co to jest nice

  4. Jak komunikować się między procesami?

  5. Ile może być procesów przetwarzanych równolegle na procesorze czterordzeniowym (z i bez Hyper Threading)?

2.1.2. Thread

  1. Co to jest wątek?

  2. Ile wątków może być w ramach procesów?

  3. Jak komunikować się między wątkami?

  4. Czy współdzielenie pamięci przez wątki jest dobre czy złe?

  5. Ile może być wątków przetwarzanych równolegle na procesorze czterordzeniowym (z i bez Hyper Threading)?

2.1.3. Threads vs processes

  1. Czym się różnią wątki od procesów?

2.2. Problemy z wątkami

  • Zakleszczania

  • Race Condition

  • Głodzenie

  • Problem 5 filozofów:

    • 5 filozofów (albo rozmyśla, albo je)

    • 5 misek ze spaghetti,

    • 5 widelców,

    • 2 widelce potrzebne aby zjeść,

    • problem zakleszczania

  • Problem producenta i konsumenta

  • Problem czytelników i pisarzy

2.3. threading

2.3.1. Delay execution

  • dlaczego nie time.sleep()

  • rekurencyjny timer

Listing 2.72. Delay execution
from threading import Timer


DELAY_SECONDS = 5.0

def hello():
    print('Hello world!')


t = Timer(DELAY_SECONDS, hello)
t.start()

print('Main Thread')

2.3.2. Recurrent timer

Listing 2.73. Recurrent timer
from threading import Timer


DELAY_SECONDS = 5.0

def hello():
    print('Timer Thread')
    Timer(DELAY_SECONDS, hello).start()


t = Timer(DELAY_SECONDS, hello)
t.start()

print('Main Thread')

2.4. Tworzenie wątków

2.5. Synchronizacja wątków

Listing 2.74. Synchronizacja wątków
from queue import Queue
from threading import Thread, Lock
from time import sleep


class MyThread(Thread):
    def __init__(self, thread_name, work_queue):
        self.thread_name = thread_name
        self.work_queue = work_queue
        super().__init__()

    def run(self):
        print(f'Starting {self.thread_name}')

        while not exit_flag:
            lock.acquire()

            if not self.work_queue.empty():
                data = work_queue.get()
                lock.release()

                # Release Queue before processing
                print(f'{self.thread_name} processing {data}')
            else:
                lock.release()

            sleep(1)

        print(f'Exiting {self.thread_name}')


exit_flag = 0
lock = Lock()
work_queue = Queue()
running_threads = []

# Create new threads
for name in ['Thread-1', 'Thread-2', 'Thread-3']:
    thread = MyThread(name, work_queue)
    thread.start()
    running_threads.append(thread)

# Fill the queue
lock.acquire()
for word in ['One', 'Two', 'Three', 'Four', 'Five']:
    work_queue.put(word)
lock.release()

# Wait for queue to empty
while not work_queue.empty():
    pass

# Notify threads it's time to exit
exit_flag = 1

# Wait for all threads to complete
for thread in running_threads:
    thread.join()

print(f'Exiting Main Thread')

2.6. Zamykanie wątków

2.7. Workery

Listing 2.75. Model Workerów
import queue
import logging
import threading

work_queue = queue.Queue()


class Worker(threading.Thread):
    daemon = True

    def run(self):
        while True:
            # Remove and return an item from the queue.
            job = work_queue.get()

            # Execute work
            logging.warning('Will do the work: %s' % job)

            # Indicate that a formerly enqueued task is complete.
            work_queue.task_done()


def spawn_worker(how_many):
    for i in range(how_many):
        Worker().start()


if __name__ == '__main__':
    spawn_worker(3)

    # Zapełnij kolejkę
    for todo in ['ping', 'ls -la', 'echo "hello world"', 'cat /etc/passwd']:
        work_queue.put(todo)

    # wait to complete all tasks
    work_queue.join()

2.8. Assignments

2.8.1. Wielowątkowość

  • Filename: thread_execute.py

  • Lines of code to write: 20 lines

  • Estimated time of completion: 30 min

  1. Stwórz kolejkę queue do której dodasz różne polecenia systemowe do wykonania, np.:

    • Lunux/macOS: ['/bin/ls /etc/', '/bin/echo "test"', '/bin/sleep 2'],

    • Windows: ['dir c:\\Users', 'echo "test"', 'type %HOMEPATH%\Desktop\README.txt'].

  2. Następnie przygotuj trzy wątki workerów, które będą wykonywały polecenia z kolejki

  3. Wątki powinny być uruchamiane jako subprocess.run() w systemie operacyjnym z timeoutem równym TIMEOUT = 2.0 sekundy

  4. Ilość poleceń może się zwiększać w miarę wykonywania zadania.

  5. Wątki mają być uruchomione w tle (ang. daemon)

Zadanie z gwiazdką
  1. Wątki powinny być uśpione za pomocą Timer przez DELAY = 5.0 sekund, a następnie ruszyć do roboty

  2. Parametry rozbij za pomocą shlex

  3. Użyj logowania za pomocą biblioteki logging tak aby przy wyświetlaniu wyników widoczny był identyfikator procesu i wątku.