1.17. Asynchronous processing

1.17.1. ASGI

ASGI (Asynchronous Server Gateway Interface) is a spiritual successor to WSGI, intended to provide a standard interface between async-capable Python web servers, frameworks, and applications.

Where WSGI provided a standard for synchronous Python apps, ASGI provides one for both asynchronous and synchronous apps, with a WSGI backwards-compatibility implementation and multiple servers and application frameworks.

1.17.2. Celery

Celery

A task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. Celery can be used to run batch jobs in the background on a regular schedule.

1.17.2.1. Rationale

  • You want your WSGI server to respond to incoming requests as quickly as possible.

  • Each request ties up a worker process until the response is finished.

  • Moving work off those workers by spinning up asynchronous jobs as tasks in a queue is a straightforward way to improve WSGI server response times.

1.17.2.2. Celery daemon

  • celeryd

  • Executes tasks

  • Workers that handle whatever tasks you put

  • Each worker will perform a task

  • When the task is completed will pick up the next one

  • The cycle will repeat continuously

  • Waiting idly when there are no more tasks

1.17.2.3. Celerybeat

  • scheduler

  • cron like

  • example execution:

    • at time intervals (every 5 seconds or once a week),

    • on a specific date or time (at 5:03pm every Sunday)

1.17.2.4. Install

  • Requires RabbitMQ

$ pip install celery

1.17.2.5. Basic usage

  1. Define task in tasks.py file by decorating function

    Listing 1.57. tasks.py
    from celery import Celery
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')
    
    @app.task
    def add(x, y):
        return x + y
    
  2. Run Celery workers with tasks module (use verbose "info" logging)

    $ celery -A tasks worker --loglevel=info
    
  3. Call function asynchronously by using .delay() special method added by Celery

    from tasks import add
    
    result = add.delay(4, 4)
    
  4. If you want to store results use:

    app = Celery('tasks', backend='db+sqlite:///results.sqlite', broker='amqp://')
    
  5. Check status

    result.ready()
    # False
    
    result.failed()
    # False
    
    result.successful()
    # False
    
    result.state       # PENDING -> STARTED -> SUCCESS
    # 'PENDING'
    

1.17.3. RabbitMQ

  • RabbitMQ is the most widely deployed open source message broker

  • Implementation of the Advanced Message Queuing Protocol (AQMP)

  • AQMP is an open standard

Messaging

A message is a way of exchanging information between application, servers and processes. When two applications share data among themselves, they can decide when to react to it when they receive the data. To exchange data effectively, one application should be independent of another application. This independence part is where a message broker comes in.

Message Broker

A message broker is an application which stores messages for an application. Whenever an application wants to send data to another application, the app publishes the message onto the message broker. The message broker then finds out which queue this message belongs to, finds out the apps which are connected to that queue and so, those apps can now consume that message.

The message broker app, like RabbitMQ, is responsible for saving that message until there is a consumer for that message. Queues are just virtually infinite buffers which store message packets.

1.17.3.1. Install

Using Docker:

$ docker run -d -p 5462:5462 rabbitmq

Ubuntu or Debian package:

$ echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
$ curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
$ sudo apt-get update
$ sudo apt-get install -y rabbitmq-server

1.17.3.2. Config

$ vim /etc/default/rabbitmq-server

1.17.3.3. Management Console

  • Manage users and their permissions and roles

  • Create new queues

  • Manage queues, monitor their consumption rate etc.

  • Purge data which is currently on queues

  • Send and receive messages

  • Memory usage against each queue and by the overall process

$ sudo rabbitmq-plugins enable rabbitmq_management
$ open http://localhost:15672/

Default credentials is:

  • username: guest

  • password: guest

Change this:

$ sudo rabbitmqctl add_user admin password
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

1.17.3.4. Manage RabbitMQ

Listing 1.58. Start the service
$ service rabbitmq-server start
Listing 1.59. Stop the service
$ service rabbitmq-server stop
Listing 1.60. Restart the service
$ service rabbitmq-server restart
Listing 1.61. Check the status
$ service rabbitmq-server status