10. Kolejki, wątki i procesy

10.1. Kolejki - Queue

10.1.1. FIFO

10.1.2. LIFO (stack)

10.1.3. Network Queue

  • Synchronizacja
  • Routing

10.1.4. Priority Queue

  • Priorytetyzacja
  • Wywłaszczenie

10.2. Wątki a procesy

  • Czym się różnią wątki od procesów?
  • Ile wątków może być w ramach procesów?
  • Jak komunikować się między wątkami?
  • Jak komunikować się między procesami?
  • Ile może być wątków na procesorze czterordzeniowym (z i bez Hyper Threading)?
  • Ile może być procesów na procesorze czterordzeniowym (z i bez Hyper Threading)?
  • Czy współdzielenie pamięci przez wątki jest dobre czy złe?

10.3. Wątki

10.3.1. Problemy z wątkami

  • Zakleszczania

  • Race Condition

  • Głodzenie

  • Problem 5 filozofów:

    • 5 filozofów (albo rozmyśla, albo je)
    • 5 misek ze spagetti,
    • 5 widelców,
    • 2 widelce potrzebne aby zjeść,
    • problem zakleszczania
  • Problem producenta i konsumenta

  • Problem czytelników i pisarzy

10.3.2. Tworzenie wątków

  • dlaczego nie time.sleep()
  • rekurencyjny timer
Code Listing 10.2. Timer
import threading


def hello():
    print('Timer Thread')


t = threading.Timer(5.0, hello)
t.start()

print('Main Thread')

10.3.3. Synchronizacja wątków

Code Listing 10.3. Synchronizacja wątków
import queue
import threading
import time


class MyThread(threading.Thread):

    def __init__(self, thread_name, work_queue):
        self.thread_name = thread_name
        self.work_queue = work_queue
        super().__init__()

    def run(self):
        print(f'Starting {self.thread_name}')

        while not exit_flag:
            lock.acquire()

            if not work_queue.empty():
                data = work_queue.get()
                lock.release()

                # Release Queue before processing
                print(f'{self.thread_name} processing {data}')
            else:
                lock.release()

            time.sleep(2)

        print(f'Exiting {self.thread_name}')


exit_flag = 0
lock = threading.Lock()
work_queue = queue.Queue()
running_threads = []

# Create new threads
for name in ['Thread-1', 'Thread-2', 'Thread-3']:
    thread = MyThread(name, work_queue)
    thread.start()
    running_threads.append(thread)

# Fill the queue
lock.acquire()
for word in ['One', 'Two', 'Three', 'Four', 'Five']:
    work_queue.put(word)
lock.release()

# Wait for queue to empty
while not work_queue.empty():
    pass

# Notify threads it's time to exit
exit_flag = 1

# Wait for all threads to complete
for thread in running_threads:
    thread.join()

print(f'Exiting Main Thread')

10.3.4. Zamykanie wątków

10.3.5. Workery

Code Listing 10.4. Model Workerów
import queue
import logging
import threading

work_queue = queue.Queue()


class Worker(threading.Thread):
    daemon = True

    def run(self):
        while True:
            # Remove and return an item from the queue.
            job = work_queue.get()

            # Execute work
            logging.warning('Will do the work: %s' % job)

            # Indicate that a formerly enqueued task is complete.
            work_queue.task_done()


def spawn_worker(how_many):
    for i in range(how_many):
        Worker().start()


if __name__ == '__main__':
    spawn_worker(3)

    # Zapełnij kolejkę
    for todo in ['ping', 'ls -la', 'echo "hello world"', 'cat /etc/passwd']:
        work_queue.put(todo)

    # wait to complete all tasks
    work_queue.join()

10.4. Procesy

10.4.1. Problemy z procesami

  • Zakleszczania
  • Race Condition

10.4.2. Tworzenie procesów

10.4.3. Synchronizacja procesów

10.4.4. IPC - komunikacja międzyprocesowa

Aby pickle mógł odtworzyć obiekt, musi posiadać jego definicję - klasę.

Code Listing 10.5. Klasa Prostokat w module figury
class Prostokat:

    def __init__(self, a, b):
        self.a = float(a)
        self.b = float(b)

    def pole(self):
        return self.a * self.b

    def obwod(self):
        return (self.a + self.b) * 2

    def __str__(self):
        return 'Prostokat(a=%s, b=%s)' % (self.a, self.b)
Code Listing 10.6. Obiekt wysyłający dane multiprocessing-client.py
from multiprocessing.connection import Client
import logging
import pickle
from .figury import Prostokat

rectangle = Prostokat(a=5, b=10)
rect = pickle.dumps(rectangle)

address = ('localhost', 6000)
conn = Client(address, authkey=b'secret password')

logging.warning('Sending objects')
conn.send([rect, 'a', 2.5, None, int, sum])

logging.warning('Sending close')
conn.send('close')

conn.close()
Code Listing 10.7. Obiekt nasłuchujący na połączenia multiprocessing-listener.py
from multiprocessing.connection import Listener
import logging
import pickle
from .figury import Prostokat

address = ('localhost', 6000)  # family is deduced to be 'AF_INET'

logging.warning('Listening on %s:%s' % address)
listener = Listener(address, authkey=b'secret password')
conn = listener.accept()

logging.warning('connection accepted from %s %s' % listener.last_accepted)

while True:
    msg = conn.recv()
    logging.warning('Received: %s' % msg)

    if msg == 'close':
        conn.close()
        break
    else:
        # do something with msg
        prostokat = pickle.loads(msg[0])
        logging.warning('Prostokat %s' % prostokat)
        print('Pole: %s' % prostokat.pole())

listener.close()

10.4.5. Zamykanie procesów

10.5. Zadania kontrolne

10.5.1. Wielowątkowość

  1. Stwórz kolejkę queue do której dodasz różne polecenia systemowe do wykonania, np.:

    • Lunux/macOS: ['/bin/ls /etc/', '/bin/echo "test"', '/bin/sleep 2'],
    • Windows: ['dir c:\\Users', 'echo "test"', 'type %HOMEPATH%\Desktop\README.txt'].
  2. Parametry rozbij za pomocą shlex

  3. Następnie przygotuj trzy wątki workerów, które będą wykonywały polecenia z kolejki.

  4. Wątki powinny być uruchamiane jako subprocess.run() w systemie operacyjnym z timeoutem równym PROCESSING_TIMEOUT = 2.0 sekundy

  5. Ilość poleceń może się zwiększać w miarę wykonywania zadania.

  6. Wątki powinny być uśpione za pomocą Timer przez 5.0 sekund, a następnie ruszyć do roboty.

  7. Wątki mają być uruchomione w tle (ang. daemon)

  8. Użyj logowania za pomocą biblioteki logging tak aby przy wyświetlaniu wyników widoczny był identyfikator procesu i wątku.