1. Queues, Threads and Processes

1.1. Kolejki - Queue

1.1.1. FIFO

1.1.2. LIFO (stack)

1.1.3. Priority Queue

  • Priorytetyzacja

  • Wywłaszczenie

1.1.4. Network Queue

  • Synchronizacja

  • Routing

1.2. Threads vs processes

  • Czym się różnią wątki od procesów?

  • Ile wątków może być w ramach procesów?

  • Jak komunikować się między wątkami?

  • Jak komunikować się między procesami?

  • Ile może być wątków przetwarzanych równolegle na procesorze czterordzeniowym (z i bez Hyper Threading)?

  • Ile może być procesów przetwarzanych równolegle na procesorze czterordzeniowym (z i bez Hyper Threading)?

  • Czy współdzielenie pamięci przez wątki jest dobre czy złe?

1.3. Threads

1.3.1. Problemy z wątkami

  • Zakleszczania

  • Race Condition

  • Głodzenie

  • Problem 5 filozofów:

    • 5 filozofów (albo rozmyśla, albo je)

    • 5 misek ze spaghetti,

    • 5 widelców,

    • 2 widelce potrzebne aby zjeść,

    • problem zakleszczania

  • Problem producenta i konsumenta

  • Problem czytelników i pisarzy

1.3.2. Tworzenie wątków

  • dlaczego nie time.sleep()

  • rekurencyjny timer

Code Listing 1.42. Timer
import threading


def hello():
    print('Timer Thread')


t = threading.Timer(5.0, hello)
t.start()

print('Main Thread')

1.3.3. Synchronizacja wątków

Code Listing 1.43. Synchronizacja wątków
import queue
import threading
import time


class MyThread(threading.Thread):

    def __init__(self, thread_name, work_queue):
        self.thread_name = thread_name
        self.work_queue = work_queue
        super().__init__()

    def run(self):
        print(f'Starting {self.thread_name}')

        while not exit_flag:
            lock.acquire()

            if not work_queue.empty():
                data = work_queue.get()
                lock.release()

                # Release Queue before processing
                print(f'{self.thread_name} processing {data}')
            else:
                lock.release()

            time.sleep(2)

        print(f'Exiting {self.thread_name}')


exit_flag = 0
lock = threading.Lock()
work_queue = queue.Queue()
running_threads = []

# Create new threads
for name in ['Thread-1', 'Thread-2', 'Thread-3']:
    thread = MyThread(name, work_queue)
    thread.start()
    running_threads.append(thread)

# Fill the queue
lock.acquire()
for word in ['One', 'Two', 'Three', 'Four', 'Five']:
    work_queue.put(word)
lock.release()

# Wait for queue to empty
while not work_queue.empty():
    pass

# Notify threads it's time to exit
exit_flag = 1

# Wait for all threads to complete
for thread in running_threads:
    thread.join()

print(f'Exiting Main Thread')

1.3.4. Zamykanie wątków

1.3.5. Workery

Code Listing 1.44. Model Workerów
import queue
import logging
import threading

work_queue = queue.Queue()


class Worker(threading.Thread):
    daemon = True

    def run(self):
        while True:
            # Remove and return an item from the queue.
            job = work_queue.get()

            # Execute work
            logging.warning('Will do the work: %s' % job)

            # Indicate that a formerly enqueued task is complete.
            work_queue.task_done()


def spawn_worker(how_many):
    for i in range(how_many):
        Worker().start()


if __name__ == '__main__':
    spawn_worker(3)

    # Zapełnij kolejkę
    for todo in ['ping', 'ls -la', 'echo "hello world"', 'cat /etc/passwd']:
        work_queue.put(todo)

    # wait to complete all tasks
    work_queue.join()

1.4. Assignments

1.4.1. Wielowątkowość

  • Filename: multiprocessing_server.py and multiprocessing_client.py

  • Lines of code to write: 50 lines

  • Estimated time of completion: 30 min

  1. Stwórz kolejkę queue do której dodasz różne polecenia systemowe do wykonania, np.:

    • Lunux/macOS: ['/bin/ls /etc/', '/bin/echo "test"', '/bin/sleep 2'],

    • Windows: ['dir c:\\Users', 'echo "test"', 'type %HOMEPATH%\Desktop\README.txt'].

  2. Parametry rozbij za pomocą shlex

  3. Następnie przygotuj trzy wątki workerów, które będą wykonywały polecenia z kolejki.

  4. Wątki powinny być uruchamiane jako subprocess.run() w systemie operacyjnym z timeoutem równym PROCESSING_TIMEOUT = 2.0 sekundy

  5. Ilość poleceń może się zwiększać w miarę wykonywania zadania.

  6. Wątki powinny być uśpione za pomocą Timer przez 5.0 sekund, a następnie ruszyć do roboty.

  7. Wątki mają być uruchomione w tle (ang. daemon)

  8. Użyj logowania za pomocą biblioteki logging tak aby przy wyświetlaniu wyników widoczny był identyfikator procesu i wątku.