Blog

Ideas and insights from our team

Celery in the wild: tips and tricks to run async tasks in the real world


This post is aimed at people with some experience writing async taks, if you are starting on Celery you might want to read this other post I wrote before starting on this one.

The thing about async tasks is that the hard part is not how to run them [although it can be fairly complicated to understand the architecture and set up things when you are just starting] but what to do when things fail. How to deal with bugs? How to recover from unexpected shutdowns? What to do when external services do not behave as expected? Those are the questions you will be asking yourself when trying to build reliable tasks. This post covers some of the best practices to help preventing errors, mitigating risks, facilitating debugging, increasing robustness and simplifying recoverability.

We need to accept that software is imperfect and integrations are problematic so we need to write code that knows how to deal with this problems.

Before getting into the actual code, we need to talk about two very important concepts: Idempotency and Atomicity.

Idempotency

"... [idempotency] is the property of certain operations in mathematics and computer science, that can be applied multiple times without changing the result beyond the initial application." - Wikipedia

Multiplying by zero and multiplying by one are two examples of idempotent operations: once you've multiplied a number by zero, no matter how many times you do it again, the result will be always zero.

3 x 0 = 0 x 0 = 0 x 0 = 0

It's the same for multiplying by one: no matter how many times you multiply a number by one, the result will never change.

3 x 1 = 3 x 1 = 3 x 1 = 3

Let's see the same concept applied to HTTP. Which methods are supposed to be idempotent?

GET POST PUT DELETE

GETing a resource should not produce changes to it. So GET is idempotent. POST is used to create a resource (at least that is what it use to do before GraphQL, but anyway…). Every POST request produces a new change to the state of the application. So POST is not idempotent. You cannot expect subsequent POST requests to keep the application state the same. PUT is a great example of an idempotent operation. The first PUT request you make, produces a change in the state. But if you keep repeating the same request, no changes are expected. DELETE is a bit of a grey area. From the application state perspective it is idempotent. From the HTTP response perspective, the first call might return 204 status code, while the next calls might return 404. So it depends on how you see it.

Atomicity

An atomic operation "... is an indivisible and irreducible series of database operations such that either all occur, or nothing occurs." - Wikipedia

Despite it's being commonly associated to database operations, the concept of atomicity can also be applied in other contexts.

@app.task
def update_data():
    user.status = 'updated'
    user.save()
    r = facebook_request()
    user.name = r.name
    user.save()

In the above code we have a task that sets the user status to 'updated', saves it, makes a request to Facebook and only then updates the user's name. This is not atomic. If the request fails, we are going to have an inconsistent state in the database (user with status=updated and a not updated name). The right way to this is to first make the request, then update the user status and name at the same time.

@app.task
def update_data():
    r = facebook_request()

    if r.status != 200:
        return

    user.name = r.name
    user.status = 'updated'
    user.save()

Now the operation is atomic, either everything succeeds or everything fails. Another way to improve atomicity is to write short tasks.

@app.task
def newsletter():
    users = all_users()
    for u in users:
        send(email, 'newsletter')

In this example we are iterating and sending emails to all users in a single task. If one of them fails, the task will stop and part of your users will receive the email while other will never see it. Even worse, you will be left with no information about which of them did not receive the message.

@app.task
def send_newsletter(email):
    send(email, 'newsletter')

@app.task
def newsletter():
    users = all_users()
    for u in users:
        send_newsletter.delay(u.email)

In this refactored example we iterate over the users again but delegate the sending to another task. If one of the emails fail it will affect only one user. The other advantage is that It's easy to identify and run again the tasks that failed. Beware there's an overhead on creating and initializing tasks. Making them too fine grained may harm performance, so keep this in mind when designing. Also notice that those examples are not idempotent.

Cool, now that we have Idempotency and Atomicity concepts fresh we can move on to strategies and Celery specific features that will help you writing better async tasks.

Retrying

As we talked in the beginning of the article, we need to recognize that tasks are error prone and write them in a way that acknowledges that. This is exactly why Independence and Atomicity are so important. Tasks that fulfil both criteria can be rerun any number of times with no risk of creating inconsistencies.

There are two main cases for things to fail in a task: bugs in your code and failures in integrations with external systems. In the former you can deploy a fix and then run the task manually again. In the later case a good solution would be to keep retrying until the external service gets fixed. For that, Celery provides a retry method that can be called inside a task. This allows you to specify in which situations it should automatically re-execute. Here is how to do it:

from tapioca.exceptions import TapiocaException
from tapioca_facebook import Facebook

@app.task(bind=True, retry_limit=4, default_retry_delay=10)
def likes_do_facebook(self):
    api = Facebook(access_token=ACCESS_TOKEN)
    try:
        api.user_likes(id='me').get()
    except TapiocaException as e:
        self.retry(exc=e)

In the example, we try to fetch the user likes from Facebook (using a the facebook tapioca client, click here to learn more about what you can do with tapioca). If it fails, we wait 10 seconds and retry. If your tasks are idempotent and atomic you should have no problems calling retry as many times as needed until it succeeds. If they are not, retrying may produce inconsistency [or can end up spamming your users if you are sending emails].

There are some other caveats to retrying. In the previous example, the request to Facebook might be failing because Facebook is under attack. So it might be a good idea to backoff and give it some space to recover. Another good idea is to make the interval between retries grow exponentially. This will increase the chances of things being back to normal on the next execution. Another good idea is to throw a random factor in the amount of time you wait before retrying a task. Imagine having 1000 tasks failing and retrying all of them in the same time. That overflow might actually be the reason why the system you are interacting with is down in the first place. This kind of backoff can be manually implemented via the countdown argument of the retry method:

def exponential_backoff(task_self):
    minutes = task_self.default_retry_delay / 60
    rand = random.uniform(minutes, minutes * 1.3)
    return int(rand ** task_self.request.retries) * 60
# in your task code

self.retry(exc=e, countdown=exponential_backoff(self))

If you are using Celery 4 you can actually cut down a few lines of code by passing a autoretry_for parameter and it's companion: retry_backoff(this one is only available on Celery 4.1). This will automatically exponentially backoff when errors arise.

from tapioca.exceptions import TapiocaException
from tapioca_facebook import Facebook

@app.task(autoretry_for=(TapiocaException,), retry_backoff=True)
def likes_do_facebook(self):
    api = Facebook(access_token=ACCESS_TOKEN)
    api.user_likes(id='me').get()

acks_late

acks_late is also something you should know about. By default Celery first marks the task as ran and then executes it, this prevents a task from running twice in case of an unexpected shutdown. This is a sane default because we cannot guarantee that every task that every developer writes can be safely ran twice. But if you proactively write Idempotent and atomic tasks turning on the task_acks_late setting will not harm your application and will instead make it more robust. If not all tasks can be configured in that way, you can set acks_late per task:

@app.task(acks_late=True)
def my_task():
    …

Time limiting

Another common issue are tasks that take too long to execute because of a bug or network latency. To prevent this from harming the performance of your application, you can set a task_time_limit. Celery will interrupt the task if it takes longer than the time you set. In case you need to do some recovering before the task is interrupted, also set task_soft_time_limit. When that runs off, Celery will raise SoftTimeLimitException and you can do some clean up before the task is killed.

from celery.exceptions import SoftTimeLimitExceeded

@app.task(task_time_limit=60, task_soft_time_limit=50)
def mytask():
    try:
        possibly_long_task()
    except SoftTimeLimitExceeded:
        recover()

Monitoring

We talked a lot about how to actively handle errors. But we all know that bugs are inevitable. Bugs in a web application are easier to spot, when something breaks your user will get a 500 page and [many times] they will find a way to let you know that something is not working for them. For async tasks this is not always the case. In many situations you'll be dealing with things that do not directly or too explicitly affect the user experience. This means that you should be extra careful monitoring tasks, so you can capture errors early and not when it's already too late.

Starting from the basics: logging. Make sure you log as much as possible. This will help you tracing what went wrong when bugs arise. As usual, be careful not to expose sensitive information in logs, as this might be a security threat to your users data. Logging is a general advice for any kind of application. But it's especially important for tasks as there is no user interface for debugging them.

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(a, b):
    logger.info('Adds {0} + {1}'.format(a, b))
    return a + b

Another very important thing is to make sure people get notified when things fail. Tools such as Sentry and Opbeat can be easily integrated to Django and Celery and will help you monitoring errors. You can also integrate them with Slack so you get a notification every time something goes wrong. Make sure you fine tune what produces notifications. Too many false positives and your team will stop paying attention and let actual errors pass unnoticed. Have a well defined process to deal with those errors and make sure they are included in the backlog and are prioritized accordingly everytime a new one shows up.

Besides that, there's an infinity of other tools that you can [and should] be using to monitor tasks. Here are two we use in most of our projects:

  • Flower is a tool to live monitor Celery tasks. It allows you to inspect which tasks are running and keep track of the executed ones. It's a standalone application and definitely worth using for bigger systems.
  • django-celerybeat-status will add a page to the Django admin interface where you will be able to see all scheduled tasks along with their next ETA. The crontab API is sometimes confusing and might lead to mistakes. This will give you a little more confidence that you got things right.

Testing and debugging

Testing and debugging tasks is normally harder than what we are used to in normal web applications. But there are a few things we can do to mitigate this. task_always_eager is a setting that comes very handy for testing and debugging. If you have it set to True, whenever you call delay or apply_async it will just run the task synchronously instead of delegating it to a worker. This will simplify debugging in local environment and facilitate automated testing. Make sure it's not activated in production.

Alright, that's all for today, let's now recap:

  • Write idempotent and atomic tasks. You want to be able to freely rerun tasks.
  • Backoff when you retry. Other systems may need some space to recover.
  • Make extensive use of monitoring tools. Tasks are harder to debug, collect as much information as you can.
  • Get notified if something fails. But be careful only to notify the important things.
  • Use task_always_eager for testing and debugging.

The last thing I want to show is Celery Tasks Checklist which has a compilation of the things we talked in this post plus a lot of other references and tips. You can actually click through items as you verify what is missing in your tasks. It's also an open source project so you are very welcome to contribute and improve it.

Let me know in the comments in case I missed something!

Thanks to David Baumgold and Arimatea for reviewing this post!

About Filipe Ximenes

Bike enthusiast, software developer and former director at Python Brasil Association. Likes open source and how people interact in open source communities.

Comments