Dealing with resource-consuming tasks on Celery

Arimatea Neto
March 28, 2018
<p>In this post, we will talk about how you can optimize your Celery tasks and avoid certain kind of problems related to resource-consuming tasks. If you are new to Celery or want to know more about it before reading this, you might wanna check first <a href="">this post</a> with an overview about Celery and <a href="">this one</a> with some more advanced tips and tricks.</p><p>When we are new to Celery, we tend to use its basic configuration after following the <a href="">Celery first steps</a>. This is fine if we have just a few tasks running on our cluster and they usually take the same time to be executed. However, as the number of tasks starts to grow and they get more and more complex, there are a few configuration options that we need to take care of if we want to avoid problems.</p><p>In this post, we are gonna go through two of the problems that may happen if we don’t take care of our routes. One of them is that if we have a task that takes too long to execute sharing the worker of quick tasks, it may cause a delay on the quick tasks execution and even resource starvation depending on the number of slow tasks. Another problem is that if we have too many tasks waiting in the queue to be executed, we may have problems with the worker’s memory, even making it crash.</p><p>Before starting with the goal of this post itself, let’s take a quick look on how to use multiple queues with Celery, how to select which workers will consume each queue and how to route the tasks between those queues. However, if you are pretty comfortable with routing tasks, feel free to jump to the next sections: Dealing with time-consuming tasks or Dealing with memory-consuming tasks.</p><h1 id="introduction-to-routing">Introduction to routing</h1><p>When developing Celery tasks, we should always keep in mind that we are able to create multiple queues and distribute our tasks between them. To do that we need to follow three steps:</p><p>The first thing we need to do is to create the queue itself. There are two ways to do that, we can either do it automatically by using the <code>task_create_missing_queues</code>, which will look for the predefined queues and create them for us, or we can do it manually by defining the queues on the project settings. Here is an example of how we can create our queues:</p><p>1 - Creating queues automatically</p><pre><code>CELERY_TASK_CREATE_MISSING_QUEUES = True </code></pre><p>2 - Creating queues manually</p><pre><code>from kombu import Queue app.conf.task_default_queue = ‘default’ app.conf.tasks_queues = ( Queue(‘default’, exchange=’default’, routing_key=’default’), Queue(‘queue2’, exchange=’queue2’, routing_key=’queue2’) ) </code></pre><p>The second step is to make the worker consume the newly defined queue. To do that, we can use the <code>celery worker -Q</code> option:</p><pre><code>celery -A proj worker -Q queue2 </code></pre><p>Finally, we should route our tasks to the created queues. We have three options to do that: do it on the settings, do it on the task or do it on the <code>apply_async</code> call. Here is an example of how to do that:</p><p>1 - Routing tasks in the</p><pre><code>task_routes = { ‘’: { ‘queue’: ‘queue2’, ‘routing_key’: ‘queue2’ } } </code></pre><p>If you wanna read more about how to set your <code>task_routes</code>, you can check <a href="">this</a> section of the Celery docs.</p><p>2 - Routing tasks on the task definition</p><pre><code>@app.task(queue=’queue2’) def new_task(): ... </code></pre><p>3 - Routing tasks on the <code>apply_async</code> call</p><pre><code>from import slow_task slow_task.apply_async(queue=’slow_queue’) </code></pre><p>That’s it. However, if you still wanna read more about Routing, you should check <a href="">the Celery documentation</a>.</p><p>Now that you are familiarized with Celery and routing, we can start to look at the problems and how we can optimize our usage of Celery to manage them. Here are two tips on how to optimize the use of queues and avoid some issues.</p><h1 id="dealing-with-time-consuming-tasks">Dealing with time-consuming tasks</h1><p>Consider that we have two tasks being executed by the same worker, a <code>slow_task</code> and a <code>quick_task</code>. The <code>slow_task</code> takes 10 seconds to execute and you receive a new one every second. In addition, the <code>quick_task</code> executes in 10 milliseconds and you receive 10 new ones per second. In this situation, just during the time it takes to execute one <code>slow_task</code> we would have 10 new <code>slow_tasks</code> and 100 new <code>quick_tasks</code> waiting. If you stop to think about these numbers, in a short period of time, we would have an immense number of tasks waiting to be executed, and some of them may “never” be executed due to the creation/execution rate.</p><p>To be able to monitor your Celery usage and check for problems like the one on the example, we recommend you to use <a href="">Flower</a>, which is a web-based tool for monitoring Celery clusters. You can also check the Monitoring section of the <a href="">more advanced tips and tricks post</a> for more details on how to monitor your tasks.</p><p>The situation above will delay a lot the execution of some tasks. For instance, think of your customer waiting for that ‘recovery email’, and it is taking hours to arrive. Also, we may not be able to execute all the needed tasks since the demand is way greater than the supply (check the numbers). Besides these problems, we also want some tasks to be executed more quickly than others. The password recovery email should be sent before the monthly PDF report generated, for example.</p><p>The idea to solve this problem has two parts. First, we need to add more workers so we can solve the supply VS demand problem. Then, we need to create different queues and routes, so the quicker tasks won’t need to wait for the slower tasks.</p><p>To apply this solution we need to follow three steps. The first thing we have to do is to create a specific queue for the <code>slow_task</code> to be executed. As we saw in the previous section, we can create the queue on the project’s <code>settings</code> as follows:</p><pre><code>from kombu import Queue app.conf.tasks_queues = ( Queue(‘default’, exchange=’default’, routing_key=’default’), Queue(‘slow_queue’, exchange=’slow_queue’, routing_key=’slow_queue’) ) </code></pre><p>The next step, after creating the new queue, is to create a new worker that will consume the new queue and set the default worker to consume only the default queue. We can do that by running the following commands:</p><pre><code>celery -A [PROJECT_APP] worker -n [WORKER_NAME] -Q default </code></pre><pre><code>celery -A [PROJECT_APP] worker -n [WORKER_NAME] -Q slow_queue </code></pre><p>Finally, we just need to route the <code>slow_task</code> to the correct worker. We can do this by setting up the <code>task_routes</code> on the project’s settings:</p><pre><code>task_routes = { ‘’: { ‘queue’: ‘slow_queue’, ‘routing_key’: ‘slow_queue’ } } </code></pre><p>Using this approach we would have the <code>slow_tasks</code> taking its time to execute in a separate worker and the <code>quick_tasks</code> being executed as soon as they arrive to the default worker.</p><h1 id="dealing-with-memory-consuming-tasks">Dealing with memory-consuming tasks</h1><p>Now that we’ve handled the first problem, we can talk about the second one. Let’s keep the same scenario from the first problem’s solution. We have the <code>default_worker</code> taking care of the <code>quick_task</code> and the <code>slow_worker</code> taking care of the <code>slow_task</code>. Previously, we said the <code>slow_task</code> would take 10 seconds to execute and that we received a new one every second. However, this time let’s take things a little bit further.</p><p>Let’s say that our <code>slow_task</code> makes a lot of database queries and processes the retrieved data in memory. Also, our database is becoming larger and larger and queries take some time to be executed. Now, our <code>slow_task</code> is taking 30 seconds to execute and we still receive a new one every second. In this new situation, in just one minute, we would have received 60 new <code>slow_tasks</code> and executed only 2.</p><p>The problem here is that our worker has a limited space in memory to work, and Celery prefetches tasks into the memory to make things faster to execute. However, in this case, our <code>slow_task</code> is using a decent amount of memory during its execution and it may not be available because Celery is prefetching too many tasks. In this situation, the worker would crash, which would be pretty bad for us.</p><p>To stop Celery from prefetching too many tasks and free the worker’s memory to our tasks, we can use the <code>CELERYD_PREFETCH_MULTIPLIER</code> setting, which is a setting that tells Celery how many tasks should be prefetched into memory at the same time. Defining it as 1 will tell Celery that it should only reserve one task per worker process at a time, freeing memory to the task. If you wanna read a little bit more about Celery prefetch limits you can take a look <a href="">here</a>.</p><p>Okay, that’s everything for now. Let’s recap:</p><ul><li>We should use routing to have a more balanced task load between workers;</li><li>We can use <a href="">Flower</a> to monitor our Celery cluster and check for problems related to its usage;</li><li>We should have a separate worker to tasks that takes too long to be executed;</li><li>When we have tasks that are way too memory-consuming, we should be careful with Celery prefetch limits.</li></ul><p>If you still wanna read more about how to optimize Celery tasks, there is <a href="">a section</a> in Celery documentation.</p><p>Let me know in the comments in case I missed something!</p><p>Thanks to <a href="">Hugo Bessa</a>, <a href="">Luca Bezerra</a> and <a href="">Álvaro Justen</a> for reviewing this post.</p>