"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 243 篇文章
  • 累计创建 15 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Celery 异步任务队列

PySuper
2025-02-16 / 0 评论 / 0 点赞 / 22 阅读 / 0 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

Django中的Celery异步任务队列

config/settings/base

if USE_TZ:
    # 设置Celery使用的时区与项目时区一致
    CELERY_TIMEZONE = TIME_ZONE
# Celery应用名称
CELERY_APP_NAME = "affect"
# 设置存储Celery任务队列的Redis数据库
# 设置存储任务结果的数据仓库
# CELERY_RESULT_BACKEND = "django-db"
# 禁用Django Celery Beat的时区感知
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 启用任务错误传播
CELERY_TASK_EAGER_PROPAGATES = True
# 禁止Celery劫持根日志记录器
CELERYD_HIJACK_ROOT_LOGGER = False
# Celery消息代理URL
# CELERY_BROKER_URL = env("CELERY_BROKER_URL")
# CELERY_BROKER_URL = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{BROKER_DB}"
CELERY_BROKER_URL = "redis://:Affect_PySuper@1.13.0.17:16379/13"
# Celery结果后端设置
# CELERY_RESULT_BACKEND = f"redis://:{REDIS_PASSWD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
# CELERY_RESULT_BACKEND = "redis://:Affect_PySuper@1.13.0.17:16379/14"
# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ["json"]
# 任务序列化器
CELERY_TASK_SERIALIZER = "json"
# 结果序列化器
CELERY_RESULT_SERIALIZER = "json"
# 任务执行时间限制(5分钟)
CELERY_TASK_TIME_LIMIT = 5 * 60
# 任务软时间限制(1分钟)
CELERY_TASK_SOFT_TIME_LIMIT = 60
# 使用数据库作为任务调度器
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# 定时任务配置
CELERY_ENABLE_UTC = False
# CELERY_BEAT_SCHEDULE = {
#     # 检查任务完成情况的定时任务
#     "check-task-completion": {
#         # 任务路径
#         "task": "apps.common.tasks.check_task_completion",
#         # 每天凌晨1点执行
#         "schedule": crontab(hour="1"),
#     },
#     "close_tasks": {
#         "task": "apps.common.tasks.close_tasks",
#         # "schedule": crontab(hour="0"),
#         "schedule": timedelta(seconds=3),
#         "args": (random.randint(1, 1000),),
#     },
#     "complex_tasks": {
#         "task": "apps.common.tasks.complex_tasks",
#         # "schedule": crontab(hour="0"),
#         "schedule": timedelta(seconds=3),
#         "args": ([10, 2],),
#     },
# }
# 指定导入的任务模块,可以指定多个
CELERY_IMPORTS = ("apps.common.tasks",)
# 定义Celery任务队列配置
CELERY_TASK_QUEUES = {
    Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("low_priority", Exchange("low_priority"), routing_key="low_priority"),
}
# 设置默认队列名称
CELERY_TASK_DEFAULT_QUEUE = "default"
# 设置默认交换机名称
CELERY_TASK_DEFAULT_EXCHANGE = "default"
# 设置默认路由键
CELERY_TASK_DEFAULT_ROUTING_KEY = "default"
# 设置任务最大重试次数
CELERY_TASK_MAX_RETRIES = 3
# 设置任务重试延迟时间(5分钟)
CELERY_TASK_DEFAULT_RETRY_DELAY = 5 * 60  # 5 minutes
# 是否立即执行任务(测试用)
CELERY_TASK_ALWAYS_EAGER = False
# 是否禁用工作进程的速率限制
CELERY_WORKER_DISABLE_RATE_LIMITS = True
# 设置工作进程的并发数
CELERY_WORKER_CONCURRENCY = 4
# 设置工作进程的预取倍数
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# 设置工作进程的日志格式
CELERY_WORKER_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
# 设置工作进程的任务日志格式
CELERY_WORKER_TASK_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s"
# 设置标准输出重定向的日志级别
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
# 设置结果后端的数据库连接
CELERY_RESULT_BACKEND = "django-db"
# 设置缓存后端的数据库连接
CELERY_CACHE_BACKEND = "django-cache"

config/celery_config

import os
import random
from datetime import timedelta

import django
from celery import Celery

# from celery.schedules import crontab
from django.conf import settings

# 获取项目配置环境,默认为 local 环境
profile = os.environ.get("PROJECT_SETTINGS", "local")

# 设置Django的配置文件
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"config.settings.local")
django.setup()

# 创建Celery实例
app = Celery(settings.CELERY_APP_NAME)

# 从Django的settings中读取Celery的配置
# 使用CELERY_命名空间,配置文件中,要以CELERY_开头
app.config_from_object("django.conf:settings", namespace="CELERY")

# 代理设置
app.conf.broker_connection_retry_on_startup = True

# 设置时区为UTC
app.conf.timezone = settings.TIME_ZONE

# 自动发现各个app下的tasks.py文件
app.autodiscover_tasks()

# Celery单任务配置,使用Redis作为后端
app.conf.ONCE = {
    "backend": "celery_once.backends.Redis",
    "graceful": True,
    "settings": {
        "url": "redis://:Affect_PySuper@1.13.0.17:16379/13",
        "default_timeout": 60 * 60,
    },
}

# 配置定时任务
app.conf.beat_schedule = {
    "close_tasks": {
        "task": "apps.common.tasks.close_tasks",
        # "schedule": crontab(hour="0"),    # crontab的时间格式为 小时:分钟:秒,较长时间
        "schedule": timedelta(seconds=3),  # timedelta的时间格式为 天:秒,较短时间
        # 这里传递参数,每次执行任务时,参数都不会变化
        # "args": (random.randint(1, 1000),),
    },
    "complex_tasks": {
        "task": "apps.common.tasks.complex_tasks",
        "schedule": timedelta(seconds=3),
    },
}

apps/common/tasks

from celery_once import QueueOnce

from config.celery_config import app, timezone
from tasks.models import Tasks
from utils.logger import logger


@app.task(bind=True, base=QueueOnce)
def close_tasks(self, task_id):
    """
    QueueOnce: 保证任务只执行一次
    创建任务后,创建一个Celery定时任务,在任务到期时,关闭任务
    :param self:
    :param task_id: 接收任务ID
    :return:
    """
    try:
        task = Tasks.objects.filter(id=task_id).first()
        if task:
            task.status = "completed"
            task.save()
            logger.info(f"==> Close Task Success: {task_id}!")
    except Exception as e:
        logger.error(f"==> Close Task Error:{task_id}==>{str(e)}", exc_info=True)
        raise self.retry(exc=e, max_retries=5, countdown=60)


@app.task(bind=True)
def repeated_tasks(self):
    """
    启动一个Celery定时任务,每天 0:00 执行一次,关闭到期的任务
    :param task_id: 任务ID
    :return:
    """
    # 启动一个Celery定时任务,每天 0:00 执行一次,关闭到期的任务
    tasks = Tasks.objects.filter(status="pending", deadline__lt=timezone.now())
    for task in tasks:
        try:
            task.status = "completed"
            task.save()
            logger.info(f"==> Close Task Success: {task.id}!")
        except Exception as e:
            logger.error(f"==> Create Task Error:{task.id}==>{str(e)}", exc_info=True)

apps/tasks/views/tasks

class TasksViewSet(CoreViewSet):
    """任务管理"""

    queryset = Tasks.objects.all().order_by("-id")
    serializer_class = TaskSerializer
    permission_classes = [AllowAny]
    search_fields = ["title"]
    filterset_fields = {"status": ["exact"], "responsible_person": ["exact"]}
    ordering_fields = ["id", "created_at"]


    @action(methods=["GET"], detail=False)
    def get_mobie(self, request):
        mobile = int(request.GET.get("mobile"))
        from apps.common.tasks import close_tasks
        from django.http import JsonResponse

        # 设置任务执行时间为当前时间的10分钟后
        eta = make_aware(datetime.now() + timedelta(seconds=3))
        res = close_tasks.apply_async(args=[mobile], eta=eta)

        return JsonResponse({"code": 200, "msg": res.id})

command

# Install
pip install celery eventlet django-celery-beat django-celery-results flower

# Windows 需要 eventlet: pip install eventlet,再加上 -P eventlet

# Worker(Linux)
celery -A config.celery_config worker -l info -P eventlet

# Worker(Windows)
celery -A config.celery_config worker -l info -P eventlet

# Beat
celery -A config.celery_config beat -l info

# Flower 
celery -A config.celery_config flower --port-5555

参考:

https://blog.csdn.net/qq_42222846/article/details/138169545

https://blog.csdn.net/weixin_52392194/article/details/141341954?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522927de725e3c30b1c8256ebd953af08e8%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=927de725e3c30b1c8256ebd953af08e8&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-141341954-null-null.nonecase&utm_term=Django%20%E4%B8%8E%20Celery%20%E7%9A%84%E6%B7%B1%E5%BA%A6%E9%9B%86%E6%88%90&spm=1018.2226.3001.4450

0

评论区