Gur Raunaq Singh Raunaq is a software engineer from India. He has been working on large-scale backend applications and has a keen interest in DevOps technologies.

Optimizing task queues with Celery and Flask

5 min read 1650

Celery Logo Over a Green Yellow and Blue Background

If you’ve stumbled upon this article, chances are you’re familiar with Flask and you’re working on adding a feature to your web app that takes quite a few seconds (if not more) to execute. Maybe you want to know if there is a better or faster way to do so.

Some common examples include:

  • Calling a third-party API to fetch some data based on user input
  • Sending an email to the user on Sign Up
  • Generating a PDF report

These types of tasks block the request/response cycle until it completes, meaning the user would need to wait a while.

To offload long-running tasks like these, you can use Celery, which provides a mechanism to offload these tasks to separate worker threads.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker.

The most commonly used brokers are Redis and RabbitMQ. We’ll set up a Redis server locally to make use of this mechanism.

Prerequisites

Additionally, intermediate knowledge of Python and Flask is expected. Everything else will be explained as the article progresses.

Setting up the project

Download the starter project and set it up using the following commands:

git clone -b step_1 https://github.com/raunaqness/flask_celery_tutorial.git
cd flask_celery_tutorial

# make virtualenv
virtualenv v
source v/bin/activate

# install dependencies
pip install -r requirements.txt

# start server
export FLASK_APP=app; python -m flask run

Open http://127.0.0.1:5000/ in your browser, and, if everything works well, you should be able to see “Hello, world!”.

Hello World Browser

We made a custom demo for .
No really. Click here to check it out.

Next, let’s add a route that will contain a Button that, when clicked, will trigger a mock long-running task, such as sending an email, generating a PDF report, calling a third-party API, etc.

We’ll mock this API by using time.sleep(), which will block the running of the application for 15 seconds.

Open app.py and add the following block of code.

# route that will show will simply render an HTML template
@app.route("/tasks")
def tasks():
    return render_template("tasks.html")

# route that will execute a long-running task
@app.route("/long_running_task")
def long_running_task():
    # time in seconds 
    time_to_wait = 15

    print(f"This task will take {time_to_wait} seconds to complete...")
    time.sleep(time_to_wait)

    return f"<p>The task completed in {time_to_wait} seconds!"

Make sure to import the time module by adding the following, along with the import statements at the top of the file:

import time

Next, create a directory named templates in the root of the project. Inside that, create a new file named tasks.html and add the following:

<!DOCTYPE html>
<html>

<head>
    <title>Tasks</title>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />

    <!-- Bootstrap CSS -->
    <link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-1BmE4kWBq78iYhFldvKuhfTAU6auU8tT94WrHftjDbrCEXSU1oBoqyl2QvZ6jIW3" crossorigin="anonymous" />
</head>

<body>
    <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.bundle.min.js"
        integrity="sha384-ka7Sk0Gln4gmtz2MlQnikT1wXgYsOg+OMhuP+IlRH9sENBO0LRn5q+8nbTov4+1p"
        crossorigin="anonymous"></script>

    <div>
        <a class="btn btn-primary" href="/long_running_task" role="button">Trigger Long Running Task</a>
    </div>
</body>

</html>

Your project structure should look something like this:

code
├── __pycache__
│   └── app.cpython-38.pyc
├── app.py
├── requirements.txt
└── templates
    └── tasks.html

2 directories, 4 files

Back in the terminal, stop and restart the Flask server again, then open http://127.0.0.1:5000/tasks in your browser. You should see the tasks.html page rendered with a single button.

Trigger Long-Running Task

Now, when you click on the Trigger Long-Running Task button, it will execute to the route /long_running_task, which will execute the function def long_running_task() as defined in the app.py file.

Notice that the page will be in the “loading” state for 15 seconds, so your application is stuck in that state and cannot perform any other operation until the current one is complete.

After 15 seconds, you should see the task completed, and the expected response in the browser.

Task Completed

Also, note that you’ll be able to see the print statements in the terminal window while the long-running task is being executed.

Print Statements

Now, let’s see how we can use Celery to run this task in the background.

In case you had any problems, you should be able to see the current state of your project here.

Setting up Celery and Redis

You have already installed the Celery python package in the initial setup. To confirm the installation of the package, you can run pip freeze in your terminal window with the virtualenv activated to see all the packages installed.

Pip Freeze

Next, you need to install Redis Server on your local machine. You can find the official installation instructions here.

Now, let’s set up Celery.

Getting started with Celery

Create a new file in the project root called celery_utils.py. This will be used to initialize the Celery app instance, similar to how we have a Flask app initialized in app.py. Add the following code to the file:

from celery import Celery

# celery config
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'

# initialize celery app
def get_celery_app_instance(app):
    celery = Celery(
        app.import_name,
        backend=CELERY_BROKER_URL,
        broker=CELERY_BROKER_URL
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

Here’s a brief explanation:

  • The Celery python package is imported
  • The function, get_celery_app_instance, is defined, which will return an instance of Celery, which in turn requires the following parameters for initialization:
    • name: this is the name of the Celery worker
    • backend: this is the URL of the backend to be used, which, in this case, is Redis, and the host URL is defined with variable CELERY_BROKER_URL
    • broker: similar to the backend, it’s required to define the URL of the broker, which is also the Redis server
  • <explain ContextTask>
  • <return instance of celery app>

Next, let’s use Celery to define a long-running task. Make the following changes in app.py:

Add the following near the import statements.

# importing function to get celery app instance
from celery_utils import get_celery_app_instance

Add the following after the statements initializing the Flask app:

# celery app instance
celery = get_celery_app_instance(app)

Next, add the following block of code toward the bottom of the file:

# celery tasks
@celery.task
def sending_email_with_celery():
    print("Executing Long running task : Sending email with celery...")
    time.sleep(15)
    print("Task complete!")

Here, we simply defined a function named sending_email_with_celery(), which will mock the functionality of sending an email that could take 15 seconds to complete.

However, to make this function run as a background task, the decorator @celery.task is added on the line just above the function definition.

If you’re not familiar with decorators in Python, here’s a good article to get started.

Finally, define a route to trigger this function:

# route to trigger celery task
@app.route("/long_running_task_celery")
def long_running_task_celery():
    # function.delay() is used to trigger function as celery task
    sending_email_with_celery.delay()
    return f"Long running task triggered with Celery! Check terminal to see the logs..."

In this code block, we define the route /long_running_task_celery, which triggers a function as a Celery task. Notice that the function is called by using the delay() method.

This indicates that we want to run this function as a Celery task, not as a regular Python function.

Finally, to see this in action, let’s add another button in tasks.html to trigger this function.

<div>
    <a class="btn btn-primary" href="/long_running_task" role="button">Trigger Long Running Task</a>
</div>

<!-- new code -->
<div>
    <a class="btn btn-primary" href="/long_running_task_celery" role="button">Trigger Long Running Task with Celery</a>
</div>

Note: Here’s the GitHub repo for this section.

Time to see it in action!

Make sure you have the Flask server running in a terminal window.

Terminal Window Flask

In another terminal window, cd to the root of the project and run the following command to start the Celery worker.

celery -A app.celery worker --loglevel=info

Celery Worker

Open http://127.0.0.1:5000/tasks in your browser, where you should see two buttons:

  1. Triggers a long-running function with Python
  2. Triggers a long-running function with Celery

We’ve already seen that if we trigger a long-running function with Python, the server is stuck until the execution of that function is complete.

Now, if you click on the button Trigger Long-Running Task with Celery, you’ll see that the page instantly redirects to the route /long_running_task_celery, and you’ll see an expected output in the browser window.

Triggered With Celery

In the background, the execution of the function is being done by Celery. To see the logs of the function running, switch to the terminal window where you started the Celery worker. It should look something like this:

Celery Worker Logs

Conclusion

That’s it! You now know how to set up and run long-running tasks with Celery in your Flask web application. Here’s a quick recap. To run a function as a Celery task, you need to:

: Full visibility into your web apps

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page and mobile apps.

.
  1. Import the instance of Celery app in your file
  2. Add decorator @celery.task on top of the function definition
  3. Run the function using the function_name.delay() method
Gur Raunaq Singh Raunaq is a software engineer from India. He has been working on large-scale backend applications and has a keen interest in DevOps technologies.

Leave a Reply