Blog

Ideas and insights from our team

Dealing with resource-consuming tasks on Celery


In this post, we will talk about how you can optimize your Celery tasks and avoid certain kind of problems related to resource-consuming tasks. If you are new to Celery or want to know more about it before reading this, you might wanna check first this post with an overview about Celery and this one with some more advanced tips and tricks.

When we are new to Celery, we tend to use its basic configuration after following the Celery first steps. This is fine if we have just a few tasks running on our cluster and they usually take the same time to be executed. However, as the number of tasks starts to grow and they get more and more complex, there are a few configuration options that we need to take care of if we want to avoid problems.

In this post, we are gonna go through two of the problems that may happen if we don’t take care of our routes. One of them is that if we have a task that takes too long to execute sharing the worker of quick tasks, it may cause a delay on the quick tasks execution and even resource starvation depending on the number of slow tasks. Another problem is that if we have too many tasks waiting in the queue to be executed, we may have problems with the worker’s memory, even making it crash.

Before starting with the goal of this post itself, let’s take a quick look on how to use multiple queues with Celery, how to select which workers will consume each queue and how to route the tasks between those queues. However, if you are pretty comfortable with routing tasks, feel free to jump to the next sections: Dealing with time-consuming tasks or Dealing with memory-consuming tasks.

Introduction to routing

When developing Celery tasks, we should always keep in mind that we are able to create multiple queues and distribute our tasks between them. To do that we need to follow three steps:

The first thing we need to do is to create the queue itself. There are two ways to do that, we can either do it automatically by using the task_create_missing_queues, which will look for the predefined queues and create them for us, or we can do it manually by defining the queues on the project settings. Here is an example of how we can create our queues:

1 - Creating queues automatically

CELERY_TASK_CREATE_MISSING_QUEUES = True

2 - Creating queues manually

from kombu import Queue

app.conf.task_default_queue = default
app.conf.tasks_queues = (
    Queue(default, exchange=default, routing_key=default),
    Queue(queue2, exchange=queue2, routing_key=queue2)
)

The second step is to make the worker consume the newly defined queue. To do that, we can use the celery worker -Q option:

celery -A proj worker -Q queue2

Finally, we should route our tasks to the created queues. We have three options to do that: do it on the settings, do it on the task or do it on the apply_async call. Here is an example of how to do that:

1 - Routing tasks in the settings.py

task_routes = {
    ‘path.to.the.new_task’: {
        ‘queue’: ‘queue2’,
        ‘routing_key’: ‘queue2’
    }
}

If you wanna read more about how to set your task_routes, you can check this section of the Celery docs.

2 - Routing tasks on the task definition

@app.task(queue=’queue2’)
def new_task():
    ...

3 - Routing tasks on the apply_async call

from path.to.the import slow_task

slow_task.apply_async(queue=slow_queue)

That’s it. However, if you still wanna read more about Routing, you should check the Celery documentation.

Now that you are familiarized with Celery and routing, we can start to look at the problems and how we can optimize our usage of Celery to manage them. Here are two tips on how to optimize the use of queues and avoid some issues.

Dealing with time-consuming tasks

Consider that we have two tasks being executed by the same worker, a slow_task and a quick_task. The slow_task takes 10 seconds to execute and you receive a new one every second. In addition, the quick_task executes in 10 milliseconds and you receive 10 new ones per second. In this situation, just during the time it takes to execute one slow_task we would have 10 new slow_tasks and 100 new quick_tasks waiting. If you stop to think about these numbers, in a short period of time, we would have an immense number of tasks waiting to be executed, and some of them may “never” be executed due to the creation/execution rate.

To be able to monitor your Celery usage and check for problems like the one on the example, we recommend you to use Flower, which is a web-based tool for monitoring Celery clusters. You can also check the Monitoring section of the more advanced tips and tricks post for more details on how to monitor your tasks.

The situation above will delay a lot the execution of some tasks. For instance, think of your customer waiting for that ‘recovery email’, and it is taking hours to arrive. Also, we may not be able to execute all the needed tasks since the demand is way greater than the supply (check the numbers). Besides these problems, we also want some tasks to be executed more quickly than others. The password recovery email should be sent before the monthly PDF report generated, for example.

The idea to solve this problem has two parts. First, we need to add more workers so we can solve the supply VS demand problem. Then, we need to create different queues and routes, so the quicker tasks won’t need to wait for the slower tasks.

To apply this solution we need to follow three steps. The first thing we have to do is to create a specific queue for the slow_task to be executed. As we saw in the previous section, we can create the queue on the project’s settings as follows:

from kombu import Queue

app.conf.tasks_queues = (
    Queue(default, exchange=default, routing_key=default),
    Queue(slow_queue, exchange=slow_queue, routing_key=slow_queue)
)

The next step, after creating the new queue, is to create a new worker that will consume the new queue and set the default worker to consume only the default queue. We can do that by running the following commands:

celery -A [PROJECT_APP] worker -n [WORKER_NAME] -Q default
celery -A [PROJECT_APP] worker -n [WORKER_NAME] -Q slow_queue

Finally, we just need to route the slow_task to the correct worker. We can do this by setting up the task_routes on the project’s settings:

task_routes = {
    ‘path.to.the.slow_task’: {
        ‘queue’: ‘slow_queue’,
        ‘routing_key’: ‘slow_queue’
    }
}

Using this approach we would have the slow_tasks taking its time to execute in a separate worker and the quick_tasks being executed as soon as they arrive to the default worker.

Dealing with memory-consuming tasks

Now that we’ve handled the first problem, we can talk about the second one. Let’s keep the same scenario from the first problem’s solution. We have the default_worker taking care of the quick_task and the slow_worker taking care of the slow_task. Previously, we said the slow_task would take 10 seconds to execute and that we received a new one every second. However, this time let’s take things a little bit further.

Let’s say that our slow_task makes a lot of database queries and processes the retrieved data in memory. Also, our database is becoming larger and larger and queries take some time to be executed. Now, our slow_task is taking 30 seconds to execute and we still receive a new one every second. In this new situation, in just one minute, we would have received 60 new slow_tasks and executed only 2.

The problem here is that our worker has a limited space in memory to work, and Celery prefetches tasks into the memory to make things faster to execute. However, in this case, our slow_task is using a decent amount of memory during its execution and it may not be available because Celery is prefetching too many tasks. In this situation, the worker would crash, which would be pretty bad for us.

To stop Celery from prefetching too many tasks and free the worker’s memory to our tasks, we can use the CELERYD_PREFETCH_MULTIPLIER setting, which is a setting that tells Celery how many tasks should be prefetched into memory at the same time. Defining it as 1 will tell Celery that it should only reserve one task per worker process at a time, freeing memory to the task. If you wanna read a little bit more about Celery prefetch limits you can take a look here.

Okay, that’s everything for now. Let’s recap:

  • We should use routing to have a more balanced task load between workers;
  • We can use Flower to monitor our Celery cluster and check for problems related to its usage;
  • We should have a separate worker to tasks that takes too long to be executed;
  • When we have tasks that are way too memory-consuming, we should be careful with Celery prefetch limits.

If you still wanna read more about how to optimize Celery tasks, there is a section in Celery documentation.

Let me know in the comments in case I missed something!

Thanks to Hugo Bessa, Luca Bezerra and Álvaro Justen for reviewing this post.

About Arimatea Neto

I'm a passionate software developer and a Python enthusiast who loves challenges and to explore new technologies.

Comments