Everyone in the Python community has heard about Celery at least once, and maybe even already worked with it. Basically, it’s a handy tool that helps run postponed or dedicated code in a separate process or even on a separate computer or server. This saves time and effort on many levels.
Table of Contents
Celery decreases performance load by running part of the functionality as postponed tasks either on the same server as other tasks, or on a different server. Most commonly, developers use it for sending emails. However, Celery has a lot more to offer. In this article, I’ll show you some Celery basics, as well as a couple of Python-Celery best practices.
If you have worked with Celery before, feel free to skip this chapter. But if Celery is new to you, here you will learn how to enable Celery in your project, and participate in a separate tutorial on using Celery with Django. Basically, you need to create a Celery instance and use it to mark Python functions as tasks.
It’s better to create the instance in a separate file, as it will be necessary to run Celery the same way it works with WSGI in Django. For example, if you create two instances, Flask and Celery, in one file in a Flask application and run it, you’ll have two instances, but use only one. It’s the same when you run Celery.
As I mentioned before, the go-to case of using Celery is sending email. I will use this example to show you the basics of using Celery. Here’s a quick Celery Python tutorial:
This code uses Django, as it’s our main framework for web applications. By using Celery, we reduce the time of response to customer, as we separate the sending process from the main code responsible for returning the response.
The simplest way to execute this task is to call
delay method of function that is provided by
Not only this – Celery provides more benefits. For example, we could set up retries upon failing.
Now the task will be restarted after ten minutes if sending fails. Also, you’ll be able to set the number of retries.
Some of you may wonder why I moved the template rendering outside of the
send_mail call. It’s because we wrap the call of
try/except, and it’s better to have as little code in
try/except as possible.
Celery makes it possible to run tasks by schedulers like crontab in Linux.
First of all, if you want to use periodic tasks, you have to run the Celery worker with –beat flag, otherwise Celery will ignore the scheduler. Your next step would be to create a config that says what task should be executed and when. Here’s an example:
*if you don’t use Django, you should use
celery_app.conf.beat_schedule instead of
What we have in this configuration is only one task that will be executed every Monday at 7 a.m.. The root key is a name or a cronjob, not a task.
You can add arguments to tasks and choose what should be done in case the same task should run at different times with different arguments. The
crontab method supports the syntax of the system crontab – such as
crontab(minute=’*/15’)– to run the task every 15 minutes.
You can also set tasks in a Python Celery queue with timeout before execution. (For example, when you need to send a notification after an action.) To do this, use the
apply_async method with an
|eta||execute task at exact time|
|countdown||execute task in N seconds|
Let’s look at what it might look like in code:
In the first example, the email will be sent in 15 minutes, while in the second it will be sent at 7 a.m. on May 20.
Celery can be distributed when you have several workers on different servers that use one message queue for task planning. You can configure an additional queue for your task/worker. For example, sending emails is a critical part of your system and you don’t want any other tasks to affect the sending. Then you can add a new queue, let’s call it
*if you don’t use Django, use
celery_app.conf.task_routes instead of
Run two separate celery workers for the default queue and the new queue:
The first line will run the worker for the default queue called
celery, and the second line will run the worker for the
-Q argument, then this worker will use all configured queues.
Sometimes, I have to deal with tasks written to go through database records and perform some operations. Quite often, developers forget about data growth, which can lead to a very long task running time. It’s always better to write tasks like these in a way that allows working with data chunks. The easiest way is to add an offset and limit parameters to a task. This will allow you to indicate the size of the chunk, and the cursor to get a new chunk of data.
This is a very simple example of how a task like this can be implemented. At the end of the task, we check how many users we found in the database. If the number equals the limit, then we’ve probably got new users to process. So we run the task again, with a new offset. If the user count is less than the limit, it means it’s the last chunk and we don’t have to continue. Beware, though: this task implementation needs to have the same ordering for records every time.
Most developers don’t record the results they get after running the task. Imagine that you can take a part of code, assign it to a task and execute this task independently as soon as you receive a user request. When we need the results of the task, we either get the results right away (if the task is completed), or wait for it to complete. Then we include the result to the general response. Using this approach, you can decrease response time, which is very good for your users and site rank.
We use this feature to run simultaneous operations. In one of our projects, we have a lot of user data and a lot of service providers. To find the best service provider, we do heavy calculations and checks. To do it faster, we create tasks for user with each service provider, run them and collect results to show to the user. It’s very easy to do with Celery task groups.
First, why do we even run two tasks? We use the second task to form calculation task groups, launch and return them. On top of that, the second task is where you can assign project filtration – like service providers that need to be calculated for a given user. All this can be done while Celery is doing other work. When the task group returns, the result of the first task is actually the calculation we are interested in.
Here’s an example of how to use this approach in code:
Here, we run calculations as soon as possible, wait for the results at the end of the method, then prepare the response and send it to the user.
I’ve probably already mentioned that I use database record IDs as task arguments instead of full objects. This is a good way to decrease the message queue size. But what’s more important is that when a task is executed, the data in the database can be changed. And when you have only IDs, you will get fresh data as opposed to outdated data you get when passing objects.
Sometimes, issues may arise when an executed task can’t find an object in a database. Why does this happen? In Django, for instance, you want to run tasks after a user is registered, like sending a greeting email, and your Django settings wrap all requests into a transaction. In Celery, however, tasks are executed fast, before the transaction is even finished. So if you use Celery when working in Django, you might see that the user doesn’t exist in the database (yet).
To deal with this, you can Google “task transaction implementation”. In general, it’s an overwritten
apply_async method in task, a class that sets up a task in
transaction.on_commit signal instead of doing it immediately.
As you see, Celery has a lot more uses than just sending emails. You can run different tasks simultaneously using the main process, and while you do your job, Celery will complete the smaller tasks at hand. You can set up queues, work with data chunks on long-running tasks, and set up times for your tasks to be executed. This will allow you to better plan your work progress, plan development time more efficiently, and spend your precious time working on the bigger things while Celery task groups work their magic.
Python & Django development
Your chance to enter the market faster