3.9. Asynchronous processing

3.9.1. ASGI

ASGI (Asynchronous Server Gateway Interface) is a spiritual successor to WSGI, intended to provide a standard interface between async-capable Python web servers, frameworks, and applications.

Where WSGI provided a standard for synchronous Python apps, ASGI provides one for both asynchronous and synchronous apps, with a WSGI backwards-compatibility implementation and multiple servers and application frameworks.

3.9.2. Celery

Celery

A task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. Celery can be used to run batch jobs in the background on a regular schedule.

3.9.2.1. Why?

  • You want your WSGI server to respond to incoming requests as quickly as possible.

  • Each request ties up a worker process until the response is finished.

  • Moving work off those workers by spinning up asynchronous jobs as tasks in a queue is a straightforward way to improve WSGI server response times.

3.9.2.2. Celery daemon

  • celeryd

  • Executes tasks

  • Workers that handle whatever tasks you put

  • Each worker will perform a task

  • When the task is completed will pick up the next one

  • The cycle will repeat continuously

  • Waiting idly when there are no more tasks

3.9.2.3. Celerybeat

  • scheduler

  • cron like

  • example execution:

    • at time intervals (every 5 seconds or once a week),

    • on a specific date or time (at 5:03pm every Sunday)

3.9.2.4. Install

  • Requires RabbitMQ

pip install celery

3.9.2.5. Basic usage

  1. Define task in tasks.py file by decorating function

    Listing 491. tasks.py
    from celery import Celery
    
    app = Celery('tasks', broker='pyamqp://[email protected]//')
    
    @app.task
    def add(x, y):
        return x + y
    
  2. Run Celery workers with tasks module (use verbose "info" logging)

    celery -A tasks worker --loglevel=info
    
  3. Call function asynchronously by using .delay() special method added by Celery

    from tasks import add
    
    result = add.delay(4, 4)
    
  4. If you want to store results use:

    app = Celery('tasks', backend='db+sqlite:///results.sqlite', broker='amqp://')
    
  5. Check status

    result.ready()
    # False
    
    result.failed()
    # False
    
    result.successful()
    # False
    
    result.state       # PENDING -> STARTED -> SUCCESS
    # 'PENDING'
    

3.9.3. RabbitMQ

  • RabbitMQ is the most widely deployed open source message broker

  • Implementation of the Advanced Message Queuing Protocol (AQMP)

  • AQMP is an open standard

Messaging

A message is a way of exchanging information between application, servers and processes. When two applications share data among themselves, they can decide when to react to it when they receive the data. To exchange data effectively, one application should be independent of another application. This independence part is where a message broker comes in.

Message Broker

A message broker is an application which stores messages for an application. Whenever an application wants to send data to another application, the app publishes the message onto the message broker. The message broker then finds out which queue this message belongs to, finds out the apps which are connected to that queue and so, those apps can now consume that message.

The message broker app, like RabbitMQ, is responsible for saving that message until there is a consumer for that message. Queues are just virtually infinite buffers which store message packets.

3.9.3.1. Install

Using Docker:

docker run -d -p 5462:5462 rabbitmq

Ubuntu or Debian package:

echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install -y rabbitmq-server

3.9.3.2. Config

vim /etc/default/rabbitmq-server

3.9.3.3. Management Console

  • Manage users and their permissions and roles

  • Create new queues

  • Manage queues, monitor their consumption rate etc.

  • Purge data which is currently on queues

  • Send and receive messages

  • Memory usage against each queue and by the overall process

sudo rabbitmq-plugins enable rabbitmq_management
open http://localhost:15672/

Default credentials is:

  • username: guest

  • password: guest

Change this:

sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

3.9.3.4. Manage RabbitMQ

Listing 492. Start the service
service rabbitmq-server start
Listing 493. Stop the service
service rabbitmq-server stop
Listing 494. Restart the service
service rabbitmq-server restart
Listing 495. Check the status
service rabbitmq-server status