Django 实战:Celery 异步任务从环境搭建到调用全掌握

简介: 本文详解 Celery 核心概念、架构组成及工作流程,并实战演示如何在 Django 项目中集成 Celery,实现异步任务调用与事务提交控制,助你掌握从配置到部署的全流程开发技巧。

一、Celery入门

介绍

Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery是实现异步任务、定时任务的一种工具。

Celery 的核心功能

  • 异步任务处理:将耗时的任务异步执行,不阻塞主程序,从而提高系统的响应速度和扩展性。例如邮件发送、消息推送等。
  • 定时任务调度:可以按照预设的时间间隔或特定时间点执行任务,例如定时清理日志、定时统计数据等。
  • 分布式任务执行:支持在多台服务器上运行 worker 进程,扩展到分布式环境中。
  • 任务状态跟踪和结果存储:可以跟踪任务的执行状态,并将任务的执行结果存储。

Celery 的架构

  • 消息中间件 (Broker):负责接收任务生产者发送的消息并将任务存入队列。常用的消息中间件有 Redis 和 RabbitMQ
  • 任务执行单元 (Worker):执行任务的实际工作进程,会从消息队列中取出任务并执行 。
  • 任务结果存储 (Backend):用于存储任务执行结果,可以是 Redis、RabbitMQ 或数据库 。
  • 任务调度器 (Beat):用于调度定时任务,会周期性地将到期需要执行的任务发送给消息队列 。

Celery 的工作流程

  1. 任务生产者将任务发送到消息队列。
  2. 消息队列存储任务,直到任务消费者获取它们。
  3. 任务消费者从消息队列中获取任务,并在本地执行。
  4. 执行完成后,任务结果存储到结果存储后端。
  5. 任务生产者可以通过 AsyncResult 查询任务的状态和结果。

生产环境建议

  • Windows 平台上安装Celery,只能用于开发环境或测试环境。生产环境,建议使用 Linux 平台。

安装

安装Redis(过程略)

安装 Celery 和 Redis客户端(作为消息中间件)

pip install redis
pip install celery

二、Celery与Django集成实战

配置Celery实例

Django项目结构示例

- mysite/
  - manage.py
  - mysite/
    - __init__.py
    - settings.py
    - urls.py

定义 Celery 实例:创建文件mysite\mysite\celery.py

"""定义和配置 Celery 实例"""

import os
from celery import Celery
from django.conf import settings


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
# 创建 Celery 实例
app = Celery("mysite")
# 加载配置文件中的 Celery 配置
app.config_from_object("django.conf:settings", namespace="CELERY")
# 自动发现并加载任务
app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)

配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py

"""Django 启动时加载Celery实例"""

from .celery import app as celery_app

__all__ = ("celery_app",)

配置Celery:修改Django项目文件settings.py

### Celery配置
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"
CELERY_BROKER_URL = "redis://localhost:6379/3"
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
CELERY_RESULT_EXTENDED = True  # 启用后才会记录 task_name、date_started 等字段
CELERY_TASK_TRACK_STARTED = True  # 记录任务开始时间

定义任务

发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构

- myapp_system/
    - tasks.py
    - models.py
- myapp_infra/
    - tasks.py
    - models.py

定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务

"""定义 Celery 任务"""

from celery import shared_task
from time import sleep

@shared_task
def send_email_task(subject, message, recipient_list):
    """
    发送电子邮件的任务
    """
    print("发送邮件任务开始执行...")
    sleep(3)
    print(f"主题: {subject}")
    print(f"内容: {message}")
    print(f"收件人: {recipient_list}")
    print("#" * 10, "\n")

调用任务

调用任务:在视图或其他代码中,使用 .delay() 方法将任务发送到 Celery 队列中。

from rest_framework.views import APIView
from rest_framework.response import Response
from .tasks import send_email_task

class SendEmailView(APIView):
    def get(self, request):
        subject = "Hello from Celery"
        message = "This is a test email sent using Celery."
        recipient_list = ["user@example.com"]

        # 异步发送邮件
        send_email_task.delay(subject, message, recipient_list)

        return Response({
   "message": "Email sent successfully"})

启动

启动Celery工作进程:在开发和测试环境中,使用下面命令启动Celery工作进程

# 进入Django项目目录(包含manage.py的目录)
celery -A mysite worker -l INFO -P solo

启动Django项目

# 进入Django项目目录(包含manage.py的目录)
python manage.py runserver

实战效果

当访问SendEmailView 视图,会看到日志中显示任务已触发和执行。

[2025-04-14 09:11:53,151: INFO/MainProcess] Task mybooks.tasks.send_email_task[c37a8725-aa59-43b4-9949-74a753c019a2] received
[2025-04-14 09:11:55,185: WARNING/MainProcess] 发送邮件任务开始执行...
[2025-04-14 09:11:58,186: WARNING/MainProcess] 主题: Hello from Celery
[2025-04-14 09:11:58,186: WARNING/MainProcess] 内容: This is a test email sent using Celery.
[2025-04-14 09:11:58,186: WARNING/MainProcess] 收件人: ['user@example.com']
[2025-04-14 09:11:58,187: WARNING/MainProcess] ##########
[2025-04-14 09:11:58,187: WARNING/MainProcess]

三、delay_on_commit

使用场景

delay_on_commit 是 Celery 提供的一个用于确保任务在数据库事务提交后执行的机制。考虑以下场景:

  • send_email任务可能会在视图将事务提交到数据库之前启动,因此任务可能无法找到用户。
  • 如果事务回滚,任务仍然会执行,处理一个不存在或无效的订单。
# views.py
def create_user(request):
    user = User.objects.create(username=request.POST['username'])
    send_email.delay(user.pk)
    return HttpResponse('User created')

# task.py
@shared_task
def send_email(user_pk):
    user = User.objects.get(pk=user_pk)
    # send email ...

使用方法

delay_on_commit 会将任务调度延迟到当前事务成功提交后执行。如果事务回滚,任务也不会被调度。

# views.py
from django.db import transaction
from .tasks import send_email_task

@transaction.atomic
def create_user(request):
    user = User.objects.create(username="zhangsan")  # 数据库操作

    # 正确:事务提交后才会触发任务
    send_email_task.delay_on_commit(user.id)

    # 如果此处抛出异常导致事务回滚,任务不会被执行
    return HttpResponse("OK")

点击查看完整代码


您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

相关文章
|
4月前
|
数据挖掘 数据库 Python
Django实战:基于Django和openpyxl实现Excel导入导出功能
`openpyxl` 是用于处理 Excel 文件的 Python 库。本文详解其在 Django 项目中的实战应用,涵盖 Excel 文件的生成、下载、上传与解析。
148 0
Django实战:基于Django和openpyxl实现Excel导入导出功能
|
4月前
|
监控 NoSQL 数据可视化
Django+Celery 进阶:Flower可视化监控与排错
本文介绍了Celery命令行工具与图形监控工具的使用,涵盖查看Worker状态、任务信息及集成至Django项目的方法,同时提供Redis监控与常见问题排错方案。
377 1
|
4月前
|
监控 NoSQL 网络协议
Django 实时通信实战:WebSocket 与 ASGI 全解析(上)
WebSocket 是一种全双工通信协议,支持实时数据传输,适用于聊天、协作、监控等场景。ASGI 是异步 Web 标准,配合 Uvicorn 服务器和 Django Channels,可实现 Django 的 WebSocket 功能,提升实时应用性能。
218 0
|
4月前
|
自然语言处理 开发者 Python
Django 实战:I18N 国际化与本地化配置、翻译与切换一步到位
Django国际化与本地化指南,涵盖i18n和l10n的定义、配置、视图与模型中的翻译使用、消息文件生成与编译,以及多语言登录实战。助你打造多语言支持的Web应用。
189 0
|
4月前
|
Shell 数据库 网络架构
Django+DRF 实战:从异常捕获到自定义错误信息(下)
本文详解了 Django REST Framework 中 ValidationError 的验证流程与优先级,涵盖字段内置验证、自定义验证方法、对象级验证及数据库约束,并通过实战演示如何自定义异常提示信息。
115 1
Django+DRF 实战:从异常捕获到自定义错误信息(下)
|
4月前
|
人工智能 开发工具 数据库
Django实战:Python代码规范指南
PEP 8 是 Python 官方代码风格指南,提升代码可读性与团队协作效率。本文详解命名规范、注释写法、常用工具(如 Black、flake8)、编程实践与代码优化技巧,助力写出规范、易维护的 Python 代码。
219 7
|
3月前
|
缓存 监控 中间件
Django中间件自定义开发指南:从原理到实战的深度解析
Django中间件是Web应用的“交通警察”,在请求与响应过程中进行全局处理,适用于身份验证、日志记录、性能监控等功能。本文详解中间件的工作原理、开发步骤及实战案例,帮助开发者掌握自定义中间件的构建方法,提升Django应用的可维护性与扩展性。
201 0
|
4月前
|
存储 前端开发 应用服务中间件
Django 实战:静态文件与媒体文件从开发配置到生产部署
Django项目中,静态文件(Static Files)和媒体文件(Media Files)是两类不同用途的文件。本文详细介绍了它们的区别、配置方法以及在开发与生产环境中的处理方式,并结合用户头像上传功能进行实战演示,最后讲解了如何通过Nginx或OpenResty部署静态与媒体文件服务。
224 1
|
3月前
|
缓存 NoSQL 数据库
Django缓存机制详解:从配置到实战应用
本文全面解析Django缓存技术,涵盖配置方法与六大缓存后端,结合实战场景演示四种典型应用方式,帮助开发者提升Web应用性能,应对高并发挑战。
79 0
|
3月前
|
存储 缓存 数据库
Django模型开发全解析:字段、元数据与继承的实战指南
Django模型是业务逻辑与数据库的核心桥梁,本文详解模型开发三大核心:字段类型选择、元数据配置与继承模式应用,涵盖实战技巧与常见问题解决方案,助你构建高效可维护的数据模型。
108 0