Django中的Celery异步任务队列
config/settings/base
if USE_TZ:
# 设置Celery使用的时区与项目时区一致
CELERY_TIMEZONE = TIME_ZONE
# Celery应用名称
CELERY_APP_NAME = "affect"
# 设置存储Celery任务队列的Redis数据库
# 设置存储任务结果的数据仓库
# CELERY_RESULT_BACKEND = "django-db"
# 禁用Django Celery Beat的时区感知
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 启用任务错误传播
CELERY_TASK_EAGER_PROPAGATES = True
# 禁止Celery劫持根日志记录器
CELERYD_HIJACK_ROOT_LOGGER = False
# Celery消息代理URL
# CELERY_BROKER_URL = env("CELERY_BROKER_URL")
# CELERY_BROKER_URL = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{BROKER_DB}"
CELERY_BROKER_URL = "redis://:Affect_PySuper@1.13.0.17:16379/13"
# Celery结果后端设置
# CELERY_RESULT_BACKEND = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
# CELERY_RESULT_BACKEND = "redis://:Affect_PySuper@1.13.0.17:16379/14"
# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ["json"]
# 任务序列化器
CELERY_TASK_SERIALIZER = "json"
# 结果序列化器
CELERY_RESULT_SERIALIZER = "json"
# 任务执行时间限制(5分钟)
CELERY_TASK_TIME_LIMIT = 5 * 60
# 任务软时间限制(1分钟)
CELERY_TASK_SOFT_TIME_LIMIT = 60
# 使用数据库作为任务调度器
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# 定时任务配置
CELERY_ENABLE_UTC = False
# CELERY_BEAT_SCHEDULE = {
# # 检查任务完成情况的定时任务
# "check-task-completion": {
# # 任务路径
# "task": "apps.common.tasks.check_task_completion",
# # 每天凌晨1点执行
# "schedule": crontab(hour="1"),
# },
# "close_tasks": {
# "task": "apps.common.tasks.close_tasks",
# # "schedule": crontab(hour="0"),
# "schedule": timedelta(seconds=3),
# "args": (random.randint(1, 1000),),
# },
# "complex_tasks": {
# "task": "apps.common.tasks.complex_tasks",
# # "schedule": crontab(hour="0"),
# "schedule": timedelta(seconds=3),
# "args": ([10, 2],),
# },
# }
# 指定导入的任务模块,可以指定多个
CELERY_IMPORTS = ("apps.common.tasks",)
# 定义Celery任务队列配置
CELERY_TASK_QUEUES = {
Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
Queue("default", Exchange("default"), routing_key="default"),
Queue("low_priority", Exchange("low_priority"), routing_key="low_priority"),
}
# 设置默认队列名称
CELERY_TASK_DEFAULT_QUEUE = "default"
# 设置默认交换机名称
CELERY_TASK_DEFAULT_EXCHANGE = "default"
# 设置默认路由键
CELERY_TASK_DEFAULT_ROUTING_KEY = "default"
# 设置任务最大重试次数
CELERY_TASK_MAX_RETRIES = 3
# 设置任务重试延迟时间(5分钟)
CELERY_TASK_DEFAULT_RETRY_DELAY = 5 * 60 # 5 minutes
# 是否立即执行任务(测试用)
CELERY_TASK_ALWAYS_EAGER = False
# 是否禁用工作进程的速率限制
CELERY_WORKER_DISABLE_RATE_LIMITS = True
# 设置工作进程的并发数
CELERY_WORKER_CONCURRENCY = 4
# 设置工作进程的预取倍数
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# 设置工作进程的日志格式
CELERY_WORKER_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
# 设置工作进程的任务日志格式
CELERY_WORKER_TASK_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s"
# 设置标准输出重定向的日志级别
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
# 设置结果后端的数据库连接
CELERY_RESULT_BACKEND = "django-db"
# 设置缓存后端的数据库连接
CELERY_CACHE_BACKEND = "django-cache"
config/celery_config
import os
import random
from datetime import timedelta
import django
from celery import Celery
# from celery.schedules import crontab
from django.conf import settings
# 获取项目配置环境,默认为 local 环境
profile = os.environ.get("PROJECT_SETTINGS", "local")
# 设置Django的配置文件
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"config.settings.local")
django.setup()
# 创建Celery实例
app = Celery(settings.CELERY_APP_NAME)
# 从Django的settings中读取Celery的配置
# 使用CELERY_命名空间,配置文件中,要以CELERY_开头
app.config_from_object("django.conf:settings", namespace="CELERY")
# 代理设置
app.conf.broker_connection_retry_on_startup = True
# 设置时区为UTC
app.conf.timezone = settings.TIME_ZONE
# 自动发现各个app下的tasks.py文件
app.autodiscover_tasks()
# Celery单任务配置,使用Redis作为后端
app.conf.ONCE = {
"backend": "celery_once.backends.Redis",
"graceful": True,
"settings": {
"url": "redis://:Affect_PySuper@1.13.0.17:16379/13",
"default_timeout": 60 * 60,
},
}
# 配置定时任务
app.conf.beat_schedule = {
"close_tasks": {
"task": "apps.common.tasks.close_tasks",
# "schedule": crontab(hour="0"), # crontab的时间格式为 小时:分钟:秒,较长时间
"schedule": timedelta(seconds=3), # timedelta的时间格式为 天:秒,较短时间
# 这里传递参数,每次执行任务时,参数都不会变化
# "args": (random.randint(1, 1000),),
},
"complex_tasks": {
"task": "apps.common.tasks.complex_tasks",
"schedule": timedelta(seconds=3),
},
}
apps/common/tasks
from celery_once import QueueOnce
from config.celery_config import app, timezone
from tasks.models import Tasks
from utils.logger import logger
@app.task(bind=True, base=QueueOnce)
def close_tasks(self, task_id):
"""
QueueOnce: 保证任务只执行一次
创建任务后,创建一个Celery定时任务,在任务到期时,关闭任务
:param self:
:param task_id: 接收任务ID
:return:
"""
try:
task = Tasks.objects.filter(id=task_id).first()
if task:
task.status = "completed"
task.save()
logger.info(f"==> Close Task Success: {task_id}!")
except Exception as e:
logger.error(f"==> Close Task Error:{task_id}==>{str(e)}", exc_info=True)
raise self.retry(exc=e, max_retries=5, countdown=60)
@app.task(bind=True)
def repeated_tasks(self):
"""
启动一个Celery定时任务,每天 0:00 执行一次,关闭到期的任务
:param task_id: 任务ID
:return:
"""
# 启动一个Celery定时任务,每天 0:00 执行一次,关闭到期的任务
tasks = Tasks.objects.filter(status="pending", deadline__lt=timezone.now())
for task in tasks:
try:
task.status = "completed"
task.save()
logger.info(f"==> Close Task Success: {task.id}!")
except Exception as e:
logger.error(f"==> Create Task Error:{task.id}==>{str(e)}", exc_info=True)
apps/tasks/views/tasks
class TasksViewSet(CoreViewSet):
"""任务管理"""
queryset = Tasks.objects.all().order_by("-id")
serializer_class = TaskSerializer
permission_classes = [AllowAny]
search_fields = ["title"]
filterset_fields = {"status": ["exact"], "responsible_person": ["exact"]}
ordering_fields = ["id", "created_at"]
@action(methods=["GET"], detail=False)
def get_mobie(self, request):
mobile = int(request.GET.get("mobile"))
from apps.common.tasks import close_tasks
from django.http import JsonResponse
# 设置任务执行时间为当前时间的10分钟后
eta = make_aware(datetime.now() + timedelta(seconds=3))
res = close_tasks.apply_async(args=[mobile], eta=eta)
return JsonResponse({"code": 200, "msg": res.id})
command
# Install
pip install celery eventlet django-celery-beat django-celery-results flower
# Windows 需要 eventlet: pip install eventlet,再加上 -P eventlet
# Worker(Linux)
celery -A config.celery_config worker -l info -P eventlet
# Worker(Windows)
celery -A config.celery_config worker -l info -P eventlet
# Beat
celery -A config.celery_config beat -l info
# Flower
celery -A config.celery_config flower --port-5555
参考:
评论区