关于celery的使用

news/2024/10/5 16:18:02

celery是什么?

 

 

 

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

另外, Celery还支持不同的并发和序列化的手段

  • 并发:Prefork, Eventlet, gevent, threads/single threaded
  • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

使用场景:

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

Celery具有以下优点

Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

Celery安装

pip install -U Celery

基本使用

创建项目celerypro

创建异步任务执行文件celery_task:

在celery_task目录下执行消费者

celery -A celery_task worker --loglevel=info

 

import celery
import time
backend='redis://127.0.0.1:6379/1'
broker='redis://127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):print("向%s发送邮件..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return "ok"

创建执行任务文件,produce_task.py:生产者直接run就ok

from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id)  

注意,异步任务文件命令执行:

celery worker -A celery_app_task -l info

创建py文件:result.py,查看任务执行结果,

from celery.result import AsyncResult
from celery_task import celasync_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)if async_result.successful():result = async_result.get()print(result)# result.forget() # 将结果删除
elif async_result.failed():print('执行失败')
elif async_result.status == 'PENDING':print('任务等待中被执行')
elif async_result.status == 'RETRY':print('任务异常后正在重试')
elif async_result.status == 'STARTED':print('任务已经开始被执行')

 

多任务结构:

 celery.py:

from celery import Celerycel = Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_tasks.task01','celery_tasks.task02'])# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

task01.py,task02.py:

#task01
import time
from celery_tasks.celery import cel@cel.task
def send_email(res):time.sleep(5)return "完成向%s发送邮件任务"%res#task02
import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):time.sleep(5)return "完成向%s发送短信任务"%name

produce_task.py:

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('yuan')
print(result.id)
result = send_msg.delay('yuan')
print(result.id)

check_result.py:

from celery.result import AsyncResult
from celery_tasks.celery import celasync_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)if async_result.successful():result = async_result.get()print(result)# result.forget() # 将结果删除,执行完成,结果不会自动删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async_result.failed():print('执行失败')
elif async_result.status == 'PENDING':print('任务等待中被执行')
elif async_result.status == 'RETRY':print('任务异常后正在重试')
elif async_result.status == 'STARTED':print('任务已经开始被执行')

开启work:celery worker -A celery_tasks -l info -P eventlet,添加任务(执行produce_task.py),检查任务执行结果(执行check_result.py)

如果第一个命令报错,请试试第二个:celery -A celery_tasks worker -l info -P eventlet

 如果提示 No module named 'eventlet'。需要安装 eventlet

pip install eventlet

Celery执行定时任务

 设定时间让celery执行一个定时任务,produce_task.py:

from celery_task import send_email
from datetime import datetime# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)

多任务结构中celery.py修改如下:

from datetime import timedelta
from celery import Celery
from celery.schedules import crontabcel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_tasks.task01','celery_tasks.task02',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# 名字随意命名'add-every-10-seconds': {# 执行tasks1下的test_celery函数'task': 'celery_tasks.task01.send_email',# 每隔2秒执行一次# 'schedule': 1.0,# 'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=6),# 传递参数'args': ('张三',)},# 'add-every-12-seconds': {#     'task': 'celery_tasks.task01.send_email',#     每年4月11号,8点42分执行#     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),#     'args': ('张三',)# },
}
启动beat程序 tasks是task01的上层目录名称和Celery生成的名字
celery -A tasks beat -l INFO

# 启动 Beat 程序$ celery beat -A celery_tasks# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
# 之后启动 worker 进程.$ celery -A celery_tasks worker -l info 或者$ celery -B -A celery_tasks worker -l info
beat是类似单个任务的product,命令:celery -A tasks beat -l INFO
work就是读消息队列执行:celery -A tasks worker -l info
 
linux通过systemctl启动
sudo systemctl daemon-reloadsudo systemctl enable flask
sudo systemctl start flasksudo systemctl enable celery_worker
sudo systemctl start celery_workersudo systemctl enable celery_beat
sudo systemctl start celery_beat

celery配置日志:

from celery import Celery
from celery.schedules import crontab
from config import Configdef make_celery(app):celery = Celery(app.import_name,broker=Config.CELERY_BROKER_URL,backend=Config.CELERY_RESULT_BACKEND)celery.conf.update(app.config)celery.conf.beat_schedule = {'daily-task': {'task': 'app.tasks.daily_task','schedule': crontab(hour=0, minute=0),  # 每天午夜执行},}celery.conf.update(worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',worker_task_log_format='[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s',worker_redirect_stdouts_level='INFO',worker_log_file='/var/log/celery_worker.log')return celery

 

 
redis查看队列任务:
在redis下执行命令 : lrange celery 0 -1
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/68027.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

帝国CMS图片集只能上传10张图片的原因及解决办法_max_file_uploads

在帝国CMS中上传图片时,如果发现上传多张图片但最终只显示部分图片,这通常是由于 PHP 配置中的 max_file_uploads 参数限制导致的。具体来说,这个参数限制了一个表单最多能上传多少个文件。 原因分析 在帝国CMS中,每张图片都会生成一张大图和一张缩略图,因此实际上每次上传…

帝国CMS为什么发布内容时间为“1970-01-01 ”

在发布内容时,如果时间显示为 1970-01-01,通常是因为以下几个原因:字段未设置为录入项:在建立系统模型时,newstime 字段没有被设置为录入项。 字段不可修改:即使设置了录入项,但该字段可能被设置为不可修改。 字段不可增加:该字段可能被设置为不可增加。解决方法 要解决…

网站避免发布内容时出现 1970-01-01 的时间显示问题

系统模型管理页面:在左侧菜单栏中选择“系统模型管理”。 在列表中找到需要编辑的系统模型,点击“编辑”。字段编辑页面:在字段列表中找到 newstime 字段。 在字段设置区域勾选“录入项”、“可修改”、“可增加”选项。 点击“保存”按钮。数据库表结构检查 如果上述设置已…

帝国CMS模板调用指定栏目的tag或当前栏目的tag

在帝国CMS模板中,可以通过不同的SQL查询方式来调用指定栏目中的所有TAG。以下是四种不同的方法及其解释。 方法1 SQL 查询sqlselect DISTINCT([!db.pre!]enewstags.tagname), [!db.pre!]enewstags.tagid, [!db.pre!]enewstags.num from [!db.pre!]enewstags inner join [!db…

帝国cms友情链接系统

一、友情链接系统说明 前台投票调用方式用友情链接标签调用plaintext[phomelink]每行显示数,显示总数,操作类型,分类id,是否显示原链接[/phomelink]例如:plaintext[phomelink]5,20,show,0,1[/phomelink]这表示每行显示5个链接,总共显示20个链接,操作类型为显示(show),分类…

【软考】4 存储系统

1、层次化存储体系 存储硬件: 注意,Cache位于CPU和主存之间,不属于主存部分 存储分类方式: 例题: 1、CPU访问存储器时,被访问数据一般聚集在一个较小的连续存储区域中。若一一个存储单元已被访问,则其邻近的存储单元有可能还要被访问,该特性被称为(A)。A、数据局部性…

帝国cms首页模板中调用一个html页面中内容方法

在帝国CMS首页模板中调用一个HTML页面内容有多种方法,具体取决于服务器是否支持 SSI(Server Side Includes)功能。以下是几种常见方法: 方法一:使用 PHP include 语句 如果服务器支持 PHP,则可以使用 include 语句来引入 HTML 页面内容。 示例代码html<?php include(…

帝国cms全站去版权方法

如果你希望去除帝国CMS中的版权信息,可以通过以下步骤进行操作。这些步骤主要涉及后台和前端的版权信息去除。 具体操作步骤后台起始页的版权信息。 后台左上角的Logo图片。 后台Logo下的快捷导航。 后台标题去除。 后台登录页面版权信息。 首页去版权信息。详细步骤扫码添加技…