Using Celery With Django for Background Task Processing

Web applications usually start out simple but can become quite complex, and most of them quickly exceed the responsibility of only responding to HTTP requests.

Using Celery With Django for Background Task Processing

When that happens, one must make a distinction between what has to happen instantly (usually in the HTTP request lifecycle) and what can happen eventually. Why is that? Well, because when your application becomes overloaded with traffic, simple things like this make the difference.

Operations in a web application can be classified as critical or request-time operations and background tasks, the ones that happen outside request time. These map to the ones described above:

  • needs to happen instantly: request-time operations
  • needs to happen eventually: background tasks

Request-time operations can be done on a single request/response cycle without worrying that the operation will time out or that the user might have a bad experience. Common examples include CRUD (Create, Read, Update, Delete) database operations and user management (Login/Logout routines).

Background tasks are different as they are usually quite time-consuming and are prone to failure, mostly due to external dependencies. Some common scenarios among complex web applications include:

  • sending confirmation or activity emails
  • daily crawling and scraping some information from various sources and storing them
  • performing data analysis
  • deleting unneeded resources
  • exporting documents/photos in various formats

Background tasks are the main focus of this tutorial. The most common programming pattern used for this scenario is the Producer Consumer Architecture.

In simple terms, this architecture can be described like this:

  • Producers create data or tasks.
  • Tasks are put into a queue that is referred to as the task queue.
  • Consumers are responsible for consuming the data or running the tasks.

Usually, the consumers retrieve tasks from the queue in a first-in-first-out (FIFO) fashion or according to their priorities. The consumers are also referred to as workers, and that is the term we will be using throughout, as it is consistent with the terminology used by the technologies discussed.

What kind of tasks can be processed in the background? Tasks that:

  • are not essential for the basic functionality of the web application
  • can’t be run in the request/response cycle since they are slow (I/O intensive, etc.)
  • depend on external resources that might not be available or not behave as expected
  • might need to be retried at least once
  • have to be executed on a schedule

Celery is the de facto choice for doing background task processing in the Python/Django ecosystem. It has a simple and clear API, and it integrates beautifully with Django. It supports various technologies for the task queue and various paradigms for the workers.

In this tutorial, we’re going to create a Django toy web application (dealing with real-world scenarios) that uses background task processing.

Setting Things Up

Assuming you are already familiar with Python package management and virtual environments, let’s install Django:

$ pip install Django

I’ve decided to build yet another blogging application. The focus of the application will be on simplicity. A user can simply create an account and without too much fuss can create a post and publish it to the platform.

Set up the quick_publisher Django project:

$ django-admin startproject quick_publisher

Let’s get the app started:

$ cd quick_publisher 
$ ./manage.py startapp main

When starting a new Django project, I like to create a main application that contains, among other things, a custom user model. More often than not, I encounter limitations of the default Django User model. Having a custom User model gives us the benefit of flexibility.

# main/models.py

from django.db import models
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, BaseUserManager


class UserAccountManager(BaseUserManager):
    use_in_migrations = True

    def _create_user(self, email, password, **extra_fields):
        if not email:
            raise ValueError('Email address must be provided')

        if not password:
            raise ValueError('Password must be provided')

        email = self.normalize_email(email)
        user = self.model(email=email, **extra_fields)
        user.set_password(password)
        user.save(using=self._db)
        return user

    def create_user(self, email=None, password=None, **extra_fields):
        return self._create_user(email, password, **extra_fields)

    def create_superuser(self, email, password, **extra_fields):
        extra_fields['is_staff'] = True
        extra_fields['is_superuser'] = True

        return self._create_user(email, password, **extra_fields)


class User(AbstractBaseUser, PermissionsMixin):

    REQUIRED_FIELDS = []
    USERNAME_FIELD = 'email'

    objects = UserAccountManager()

    email = models.EmailField('email', unique=True, blank=False, null=False)
    full_name = models.CharField('full name', blank=True, null=True, max_length=400)
    is_staff = models.BooleanField('staff status', default=False)
    is_active = models.BooleanField('active', default=True)

    def get_short_name(self):
        return self.email

    def get_full_name(self):
        return self.email

    def __unicode__(self):
        return self.email

Make sure to check out the Django documentation if you are not familiar with how custom user models work.

Now we need to tell Django to use this User model instead of the default one. Add this line to the quick_publisher/settings.py file:

AUTH_USER_MODEL = 'main.User'

We also need to add the main application to the INSTALLED_APPS list in the quick_publisher/settings.py file. We can now create the migrations, apply them, and create a superuser to be able to log in to the Django admin panel:

$ ./manage.py makemigrations main 
$ ./manage.py migrate 
$ ./manage.py createsuperuser

Let’s now create a separate Django application that’s responsible for posts:

$ ./manage.py startapp publish

Let’s define a simple Post model in publisher/models.py:

from django.db import models
from django.utils import timezone
from django.contrib.auth import get_user_model


class Post(models.Model):
    author = models.ForeignKey(get_user_model())
    created = models.DateTimeField('Created Date', default=timezone.now)
    title = models.CharField('Title', max_length=200)
    content = models.TextField('Content')
    slug = models.SlugField('Slug')

    def __str__(self):
        return '"%s" by %s' % (self.title, self.author)

Hooking the Post model with the Django admin is done in the publisher/admin.py file like this:

from django.contrib import admin
from .models import Post


@admin.register(Post)
class PostAdmin(admin.ModelAdmin):
    pass

Finally, let’s hook the publisher application with our project by adding it to the INSTALLED_APPS list.

We can now run the server and head over to http://localhost:8000/admin/ and create our first posts so that we have something to play with:

$ ./manage.py runserver

I trust you’ve done your homework and you’ve created the posts.

Let’s move on. The next obvious step is to create a way to view the published posts.

# publisher/views.py

from django.http import Http404
from django.shortcuts import render
from .models import Post


def view_post(request, slug):
    try:
        post = Post.objects.get(slug=slug)
    except Post.DoesNotExist:
        raise Http404("Poll does not exist")

    return render(request, 'post.html', context={'post': post})

Let’s associate our new view with an URL in: quick_publisher/urls.py

# quick_publisher/urls.py

from django.conf.urls import url
from django.contrib import admin

from publisher.views import view_post

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^(?P<slug>[a-zA-Z0-9-]+)', view_post, name='view_post')
]

Finally, let’s create the template that renders the post in: publisher/templates/post.html

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title></title>
</head>
<body>
    <h1>{{ post.title }}</h1>
    <p>{{ post.content }}</p>
    <p>Published by {{ post.author.full_name }} on {{ post.created }}</p>
</body>
</html>

We can now head to http://localhost:8000/the-slug-of-the-post-you-created/ in the browser. It’s not exactly a miracle of web design, but making good-looking posts is beyond the scope of this tutorial.

Sending Confirmation Emails

Here’s the classic scenario:

  • You create an account on a platform.
  • You provide an email address to be uniquely identified on the platform.
  • The platform checks you are indeed the owner of the email address by sending an email with a confirmation link.
  • Until you perform the verification, you are not able to (fully) use the platform.

Let’s add an is_verified flag and the verification_uuid on the User model:

# main/models.py
import uuid


class User(AbstractBaseUser, PermissionsMixin):

    REQUIRED_FIELDS = []
    USERNAME_FIELD = 'email'

    objects = UserAccountManager()

    email = models.EmailField('email', unique=True, blank=False, null=False)
    full_name = models.CharField('full name', blank=True, null=True, max_length=400)
    is_staff = models.BooleanField('staff status', default=False)
    is_active = models.BooleanField('active', default=True)
    is_verified = models.BooleanField('verified', default=False) # Add the `is_verified` flag
    verification_uuid = models.UUIDField('Unique Verification UUID', default=uuid.uuid4)

    def get_short_name(self):
        return self.email

    def get_full_name(self):
        return self.email

    def __unicode__(self):
        return self.email

Let’s use this occasion to add the User model to the admin:

from django.contrib import admin
from .models import User


@admin.register(User)
class UserAdmin(admin.ModelAdmin):
    pass

Let’s make the changes reflect in the database:

$ ./manage.py makemigrations 
$ ./manage.py migrate

We now need to write a piece of code that sends an email when a user instance is created. This is what Django signals are for, and this is a perfect occasion to touch this subject.

Signals are fired before/after certain events occur in the application. We can define callback functions that are triggered automatically when the signals are fired. To make a callback trigger, we must first connect it to a signal.

We’re going to create a callback that will be triggered after a User model has been created. We’ll add this code after the User model definition in: main/models.py

from django.db.models import signals
from django.core.mail import send_mail


def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_mail(
            'Verify your QuickPublisher account',
            'Follow this link to verify your account: '
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(instance.verification_uuid)}),
            [email protected]',
            [instance.email],
            fail_silently=False,
        )

signals.post_save.connect(user_post_save, sender=User)

What we’ve done here is we’ve defined a user_post_save function and connected it to the post_save signal (one that is triggered after a model has been saved) sent by the User model.

Django doesn’t just send emails out on its own; it needs to be tied to an email service. For the sake of simplicity, you can add your Gmail credentials in quick_publisher/settings.py, or you can add your favourite email provider.

Here’s what Gmail configuration looks like:

EMAIL_USE_TLS = True
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER = '<YOUR_GMAIL_USERNAME>@gmail.com'
EMAIL_HOST_PASSWORD = '<YOUR_GMAIL_PASSWORD>'
EMAIL_PORT = 587

To test things out, go into the admin panel and create a new user with a valid email address you can quickly check. If all went well, you’ll receive an email with a verification link. The verification routine is not ready yet.

Here’s how to verify the account:

# main/views.py

from django.http import Http404
from django.shortcuts import render, redirect
from .models import User


def home(request):
    return render(request, 'home.html')


def verify(request, uuid):
    try:
        user = User.objects.get(verification_uuid=uuid, is_verified=False)
    except User.DoesNotExist:
        raise Http404("User does not exist or is already verified")

    user.is_verified = True
    user.save()

    return redirect('home')

Hook the views up in: quick_publisher/urls.py

# quick_publisher/urls.py

from django.conf.urls import url
from django.contrib import admin

from publisher.views import view_post
from main.views import home, verify

urlpatterns = [
    url(r'^$', home, name='home'),
    url(r'^admin/', admin.site.urls),
    url(r'^verify/(?P<uuid>[a-z0-9-]+)/', verify, name='verify'),
    url(r'^(?P<slug>[a-zA-Z0-9-]+)', view_post, name='view_post')
]

Also, remember to create a home.html file under main/templates/home.html. It will be rendered by the home view.

Try to run the entire scenario all over again. If all is well, you’ll receive an email with a valid verification URL. If you’ll follow the URL and then check in the admin, you can see how the account has been verified.

Sending Emails Asynchronously

Here’s the problem with what we’ve done so far. You might have noticed that creating a user is a bit slow. That’s because Django sends the verification email inside the request time.

This is how it works: we send the user data to the Django application. The application creates a User model and then creates a connection to Gmail (or another service you selected). Django waits for the response, and only then does it return a response to our browser.

Here is where Celery comes in. First, make sure it is installed:

$ pip install Celery

We now need to create a Celery application in our Django application:

# quick_publisher/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')

app = Celery('quick_publisher')
app.config_from_object('django.conf:settings')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Celery is a task queue. It receives tasks from our Django application, and it will run them in the background. Celery needs to be paired with other services that act as brokers.

Brokers intermediate the sending of messages between the web application and Celery. In this tutorial, we’ll be using Redis. Redis is easy to install, and we can easily get started with it without too much fuss.

You can install Redis by following the instructions on the Redis Quick Start page. You’ll need to install the Redis Python library, pip install redis, and the bundle necessary for using Redis and Celery: pip install celery[redis].

Start the Redis server in a separate console like this: $ redis-server

Let’s add the Celery/Redis related configs into quick_publisher/settings.py:

# REDIS related settings 
REDIS_HOST = 'localhost' 
REDIS_PORT = '6379' 
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0' 
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} 
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'

Before anything can be run in Celery, it must be declared as a task.

Here’s how to do this:

# main/tasks.py

import logging

from django.urls import reverse
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from quick_publisher.celery import app


@app.task
def send_verification_email(user_id):
    UserModel = get_user_model()
    try:
        user = UserModel.objects.get(pk=user_id)
        send_mail(
            'Verify your QuickPublisher account',
            'Follow this link to verify your account: '
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(user.verification_uuid)}),
            [email protected]',
            [user.email],
            fail_silently=False,
        )
    except UserModel.DoesNotExist:
        logging.warning("Tried to send verification email to non-existing user '%s'" % user_id)

What we’ve done here is this: we moved the sending verification email functionality in another file called tasks.py.

A few notes:

  • The name of the file is important. Celery goes through all the apps in INSTALLED_APPS and registers the tasks in tasks.py files.
  • Notice how we decorated the send_verification_email function with @app.task. This tells Celery this is a task that will be run in the task queue.
  • Notice how we expect as argument user_id rather than a User object. This is because we might have trouble serializing complex objects when sending the tasks to Celery. It’s best to keep them simple.

Going back to main/models.py, the signal code turns into:

from django.db.models import signals
from main.tasks import send_verification_email


def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_verification_email.delay(instance.pk)

signals.post_save.connect(user_post_save, sender=User)

Notice how we call the .delay method on the task object. This means we’re sending the task off to Celery and we don’t wait for the result. If we used send_verification_email(instance.pk) instead, we would still be sending it to Celery, but would be waiting for the task to finish, which is not what we want.

Before you start creating a new user, there’s a catch. Celery is a service, and we need to start it. Open a new console, make sure you activate the appropriate virtualenv, and navigate to the project folder.

$ celery worker -A quick_publisher --loglevel=debug --concurrency=4

This starts four Celery process workers. Yes, now you can finally go and create another user. Notice how there’s no delay, and make sure to watch the logs in the Celery console and see if the tasks are properly executed. This should look something like this:

[2017-04-28 15:00:09,190: DEBUG/MainProcess] Task accepted: main.tasks.send_verification_email[f1f41e1f-ca39-43d2-a37d-9de085dc99de] pid:62065
[2017-04-28 15:00:11,740: INFO/PoolWorker-2] Task main.tasks.send_verification_email[f1f41e1f-ca39-43d2-a37d-9de085dc99de] succeeded in 2.5500912349671125s: None

Periodic Tasks With Celery

Here’s another common scenario. Most mature web applications send their users lifecycle emails in order to keep them engaged. Some common examples of lifecycle emails:

  • monthly reports
  • activity notifications (likes, friendship requests, etc.)
  • reminders to accomplish certain actions (“Don’t forget to activate your account”)

Here’s what we’re going to do in our app. We’re going to count how many times every post has been viewed and send a daily report to the author. Once every single day, we’re going to go through all the users, fetch their posts, and send an email with a table containing the posts and view counts.

Let’s change the Post model so that we can accommodate the view counts scenario.

class Post(models.Model):
    author = models.ForeignKey(User)
    created = models.DateTimeField('Created Date', default=timezone.now)
    title = models.CharField('Title', max_length=200)
    content = models.TextField('Content')
    slug = models.SlugField('Slug')
    view_count = models.IntegerField("View Count", default=0)

    def __str__(self):
        return '"%s" by %s' % (self.title, self.author)

As always, when we change a model, we need to migrate the database:

$ ./manage.py makemigrations 

$ ./manage.py migrate

Let’s also modify the view_post Django view to count views:

def view_post(request, slug):
    try:
        post = Post.objects.get(slug=slug)
    except Post.DoesNotExist:
        raise Http404("Poll does not exist")
    
    post.view_count += 1
    post.save()

    return render(request, 'post.html', context={'post': post})

It would be useful to display the view_count in the template. Add this <p>Viewed {{ post.view_count }} times</p> somewhere inside the publisher/templates/post.html file. Do a few views on a post now and see how the counter increases.

Let’s create a Celery task. Since it is about posts, I’m going to place it in publisher/tasks.py:

from django.template import Template, Context
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from quick_publisher.celery import app
from publisher.models import Post


REPORT_TEMPLATE = """
Here's how you did till now:

{% for post in posts %}
        "{{ post.title }}": viewed {{ post.view_count }} times |

{% endfor %}
"""


@app.task
def send_view_count_report():
    for user in get_user_model().objects.all():
        posts = Post.objects.filter(author=user)
        if not posts:
            continue

        template = Template(REPORT_TEMPLATE)

        send_mail(
            'Your QuickPublisher Activity',
            template.render(context=Context({'posts': posts})),
            [email protected]',
            [user.email],
            fail_silently=False,
        )

Every time you make changes to the Celery tasks, remember to restart the Celery process. Celery needs to discover and reload tasks. Before creating a periodic task, we should test this out in the Django shell to make sure everything works as intended:

$ ./manage.py shell 

In [1]: from publisher.tasks import send_view_count_report 

In [2]: send_view_count_report.delay()

Hopefully, you received a nifty little report in your email.

Let’s now create a periodic task. Open up quick_publisher/celery.py and register the periodic tasks:

# quick_publisher/celery.py

import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')

app = Celery('quick_publisher')
app.config_from_object('django.conf:settings')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

app.conf.beat_schedule = {
    'send-report-every-single-minute': {
        'task': 'publisher.tasks.send_view_count_report',
        'schedule': crontab(),  # change to `crontab(minute=0, hour=0)` if you want it to run daily at midnight
    },
}

So far, we created a schedule that would run the task publisher.tasks.send_view_count_report every minute as indicated by the crontab() notation. You can also specify various Celery Crontab schedules.

Open up another console, activate the appropriate environment, and start the Celery Beat service.

$ celery -A quick_publisher beat

The Beat service’s job is to push tasks in Celery according to the schedule. Take into account that the schedule makes the send_view_count_report task run every minute according to the setup. It’s good for testing but not recommended for a real-world web application.

Making Tasks More Reliable

Tasks are often used to perform unreliable operations, operations that depend on external resources or that can easily fail due to various reasons. Here’s a guideline for making them more reliable:

  • Make tasks idempotent. An idempotent task is a task that, if stopped midway, doesn’t change the state of the system in any way. The task either makes full changes to the system or none at all.
  • Retry the tasks. If the task fails, it’s a good idea to try it again and again until it’s executed successfully. You can do this in Celery with Celery Retry. One other interesting thing to look at is the Exponential Backoff algorithm. This could come in handy when thinking about limiting unnecessary load on the server from retried tasks.

Conclusions

I hope this has been an interesting tutorial for you and a good introduction to using Celery with Django.

Here are a few conclusions we can draw:

  • It’s good practice to keep unreliable and time-consuming tasks outside the request time.
  • Long-running tasks should be executed in the background by worker processes (or other paradigms).
  • Background tasks can be used for various tasks that are not critical for the basic functioning of the application.
  • Celery can also handle periodic tasks using the celery beat service.
  • Tasks can be more reliable if made idempotent and retried (maybe using exponential backoff).