Celery在Django中的使用

Celery在Django中的使用

参考1

Celery4.0只支持Django版本>=1.8的,如果是小于1.8版本需要使用Celery3.1。 django-celery-beat 2.X 支持Django 2.x和3.x django-celery-beat 1.6以下支持Django 1.11以下

  1. 安装 pip install celery,Django需要安装django-celery-beat插件

  2. 配置

    • 在Django项目的app目录下新增tasks.py文件,用于定义任务

    • 在app目录下新建celery.py:

      !/usr/bin/env python3
      -- coding:utf-8 --
      Author:wd
      from future import absolute_import, unicode_literals
      import os
      from celery import Celery
      
      os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'projectname.settings')  # 设置django环境
      
      app = Celery('appname')  # 使用跟projectname一致的appname,既appname=projectname
      
      app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置
      
      app.autodiscover_tasks()  # 发现任务文件每个app下的task.py
      ```
      
      修改`unoecp/unoecp/__init__.py`:
      
      ```python
      from __future__ import absolute_import, unicode_literals
      from .celery import app as celery_app
      __all__ = ['celery_app']
      ```
      
      修改`unoecp/unoecp/settings.py`配置:  
      
      ```python
      CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
      
      CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
      
      CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
  3. 运行: 进入项目的unoecp目录启动worker: celery worker -A unoecp -l debug Celery>5.0 -A 参数位置在celery之后workder之前 celery -A unoecp worker -l debug

  4. 定义与触发任务

    4.1. 任务定义在每个tasks文件中,app01/tasks.py

    from future import absolute_import, unicode_literals from celery import shared_task
    
    @shared_task
    def add(x, y):
      return x + y
    
    @shared_task
    def mul(x, y):
        return x * y

    4.2. 视图触发任务:

    from django.http import JsonResponse
    from app01 import tasks
    
    # Create your views here.
    
    def index(request,*args,**kwargs):
      res=tasks.add.delay(1,3)
      #任务逻辑
      return JsonResponse({'status':'successful','task_id':res.task_id})

    4.3. 访问http://127.0.0.1:8000/index

    4.4. 若想获取任务结果,可以通过task_id使用AsyncResult获取结果,还可以直接通过backend获取:get celery-task-meta-xxxxxxxxxxxxxx

  5. 扩展 除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。

5.1. 安装 pip install django-celery-results==1.2.1

注意 django-celery-results==1.2.1支持Django1.11, django-celery-results>=2.0支持Django 2.x+ django-celery-results==1.2.1 依赖 celery-4.4.7,不支持celery-5.x

5.2. 配置settings.py,注册app

5.3. 修改backend配置,将redis改为django-db

5.4. 修改数据库python3 manage.py migrate django_celery_results

5.5. 当然你有时候需要对task表进行操作,以下源码的表结构定义:

  1. Django中使用定时任务 如果想要在django中使用定时任务功能同样是靠beat完成任务发送功能,当在Django中使用定时任务时,需要安装django-celery-beat插件。以下将介绍使用过程。

    6.1. 安装pip3 install django-celery-beat

    6.2. 注册APP

    6.3. 数据库变更python3 manage.py migrate django_celery_beat 6.4. 分别启动woker和beta

    6.5. 配置admin

    6.6. 创建用户python3 manage.py createsuperuser

    6.7. 登录admin进行管理(地址http://127.0.0.1:8000/admin)并且还可以看到我们上次使用orm作为结果存储的表。 http://127.0.0.1:8000/admin/login/?next=/admin/

    6.8. 二次开发 django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例: settings.py

    urls.py

    views.py

    访问http://127.0.0.1:8000/tasks如下:

最后更新于

这有帮助吗?