Building Powerful Distributed Applications with Celery in Python.


distribute

What is Celery?

Celery is a distributed task queue for Python that allows you to run tasks asynchronously across multiple worker nodes. It is used to handle long-running and resource-intensive tasks, such as translating an entire library, processing images, sending emails or generating reports. Celery uses a message broker to send and receive messages between the client and the workers, which enables the system to distribute tasks and scale horizontally.

Prerequisites

To follow this tutorial, you should have a basic understanding of Python and its package management system, pip. You’ll also need to have the following software installed on your machine:

  • Python 3
  • pip
  • Redis

Step 1: Install Celery

The first step is to install Celery using pip. Open a command prompt or terminal and run the following command:

pip install celery

Step 2: Create a Celery Project

Next, create a new Python project for your Celery application. In your terminal, navigate to the directory where you want to create your project and run the following command:

mkdir celery_project
cd celery_project

Inside the celery_project directory, create a new file called tasks.py. This file will contain the Celery tasks that you want to run asynchronously.

Step 3: Define Celery Tasks

In the tasks.py file, define your Celery tasks. For example, let’s create a task that generates a report:

from celery import Celery, Task

app = Celery('tasks', broker='redis://localhost:6379/0')

class GenerateReportTask(Task):
    name = "generate_report_task"
    bind = True

    def run(self, report_id):
        # code to generate report
        self.update_state(state='PROGRESS', meta={'report_id': report_id, 'status': 'Generating report...'})
        # code to generate report
        return f'Report {report_id} generated successfully!'

app.tasks.register(GenerateReportTask())

In this example, we create a new Celery app and specify Redis as the message broker. We also define a new class GenerateReportTask which inherits from Celery’s Task class. This class defines the run method which generates the report and updates the task status using the update_state method.

We also set the name attribute to “generate_report_task”, which is the name of the task that will be registered in Celery. The bind attribute is set to True, which means that the task will be bound to its instance and will receive the instance as the first argument.

Lastly, we register the GenerateReportTask class with Celery using the app.tasks.register method.

Step 4: Start a Celery Worker

To run the Celery tasks asynchronously, you need to start a worker. Open a new terminal window and navigate to the celery_project directory. Then, run the following command:

celery -A tasks worker --loglevel=info -c 4

This starts a Celery worker that listens for tasks to be run. The -c option sets the concurrency level to 4, which means that the worker can handle up to 4 tasks concurrently. The –loglevel=info option sets the log level to info, which shows informative messages about the worker’s status and the tasks it is executing.

Step 5: Run a Celery Task

To run a Celery task, you need to call it asynchronously from your Python code. In a new Python file, import the Celery app and the task you want to run:

from tasks import app, GenerateReportTask

Then, create an instance of the task and call it asynchronously using the apply_async method:

task = GenerateReportTask()
result = task.apply_async(args=[report_id])

The apply_async method returns an AsyncResult object, which you can use to check the status of the task, get the result when it is finished, or handle any errors that occur.

Step 6: Handle Task Results Using AsyncResult

To handle the result of a Celery task, you can use the AsyncResult object that is returned by the apply_async method. The AsyncResult object has several methods that you can use to get the status of the task or retrieve its result.

For example, you can use the get method to retrieve the result of a completed task:

result = task.apply_async(args=[report_id])
result.get()  # blocks until the task is completed

You can also use the status property to check the status of the task:

result = task.apply_async(args=[report_id])
result.status  # returns the current status of the task

Step 7: Update Task Status

To update the status of a Celery task, you can use the update_state method in your task. This method takes two arguments: state and meta.

The state argument should be a string that represents the current state of the task, such as “PROGRESS”, “SUCCESS”, or “FAILURE”. The meta argument can be any JSON-serializable object that provides additional information about the task.

For example, you can update the status of the GenerateReportTask task to show the progress of the report generation:

class GenerateReportTask(Task):
    name = "generate_report_task"
    bind = True

    def run(self, report_id):
        # code to generate report
        self.update_state(state='PROGRESS', meta={'report_id': report_id, 'status': 'Generating report...'})
        # code to generate report
        self.update_state(state='SUCCESS', meta={'report_id': report_id, 'status': 'Report generated successfully!'})
        return f'Report {report_id} generated successfully!'

In this example, we call update_state with the “PROGRESS” state and a meta dictionary that contains the report_id and the current status of the report generation. We then call update_state again with the “SUCCESS” state and a meta dictionary that contains the report_id and a success message.

Step 8: Change Concurrency

You can change the concurrency level of a Celery worker at runtime by sending it a signal. Open a new terminal window and run the following command:

celery -A tasks control add_consumer -Q celery -c 8

This sends a SIGHUP signal to the Celery worker and adds 4 more workers to the celery queue, increasing the concurrency level to 8.

In this tutorial, you learned how to create Celery tasks that run asynchronously, start a Celery worker, and run tasks using AsyncResult. You also learned how to update the status of a task and change the concurrency level of a Celery worker. With this knowledge, you can create distributed applications that handle long-running tasks with celery and python.