赞
踩
推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler
库 | 大小 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
Schedule | 轻量级 | 易用无配置 | 不能动态添加任务或持久化任务 | 简单任务 |
Celery | 重量级 | ①任务队列 ②分布式 | ①不能动态添加定时任务到系统中,如Flask(Django可以) ②设置起来较累赘 | 任务队列 |
APScheduler | 相对重量级 | ①灵活,可动态增删定时任务并持久化 ②支持多种存储后端 ③集成框架多,用户广 | 重量级,学习成本大 | 通用 |
Rocketry | 轻量级 | 易用功能强 | 尚未成熟,文档不清晰 | 通用 |
Celery不适合动态添加定时任务,但本人认为可以通过数据库+递归调用自身实现
Celery
是一款简单灵活可靠的分布式任务执行框架,支持大量任务的并发执行。
Celery
采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。
应用场景
本文使用 Redis 作为 Broker 即消息队列
pip install celery
pip install redis
需要持久化任务的话,Broker 使用 RabbitMQ 并设置持久化队列。
官方建议生产环境首选 RabbitMQ ,突然停止或断电 Redis 可能会数据丢失。
注意!从 Celery 4.x 开始官方不再支持Windows。
Celery 的开发主要有四个步骤:
启动 Redis
redis-server
实例化 Celery 和 定义任务
tasks.py
import time
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1') # 实例化 Celery
@celery.task
def sendmail(mail): # 定义任务。使用@task装饰器
print('sending mail to %s...' % mail['to'])
time.sleep(2.0)
print('mail sent.')
return True
启动任务 Worker
celery -A tasks worker --loglevel=info --pool=solo
调用任务
from tasks import sendmail
result = sendmail.delay(dict(to='celery@python.org'))
value = result.get()
print(value) # 运算结果值
print(result.successful()) # 是否成功
# print(result.fail()) # 是否失败
print(result.ready()) # 是否执行完成
print(result.state) # 状态 PENDING -> STARTED -> SUCCESS/FAILURE
结果
.
└─proj
config.py # 配置文件
__init__.py # Celery实例化
tasks.py # 实时任务
period_task.py # 定时任务
config.py
BROKER_URL = 'redis://localhost:6379/0' # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 导入的任务模块
'proj.tasks',
'proj.period_task'
)
流行使用RabbitMQ作为Broker中间件,Redis作为结果后端。
__init__.py
from celery import Celery
app = Celery('proj') # 创建Celery实例
app.config_from_object('proj.config') # 从配置文件中读取配置
tasks.py
from proj import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def show(a):
return a
启动任务Worker
celery worker -A proj -l info -c 4 -P solo
period_task.py
from proj import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""按频率执行定时任务"""
# 每5秒执行一次tostring('Hello')
sender.add_periodic_task(5.0, tostring.s('Hello'), name='tostring')
# 每周一07:30执行tostring('Happy Mondays!')
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
tostring.s('Happy Mondays!'),
)
# 每分钟执行一次
sender.add_periodic_task(
crontab(minute='*/1'),
tostring.s('A minute'),
name='A minute'
)
@app.task
def tostring(s):
return s
先忽略这部分
更细粒度定时设置查阅:
定时任务配置也可以这样设置
from proj import app
from celery.schedules import crontab
@app.task
def tostring(s):
return s
app.conf.beat_schedule.update(
hello={
'task': tostring.name,
'schedule': 5.0, # 每5秒执行一次tostring('Hello')
'args': ('Hello',)
},
happy_mondays={
'task': tostring.name,
'schedule': crontab(hour=7, minute=30, day_of_week=1), # 每周一07:30执行tostring('Happy Mondays!')
'args': ('Happy Mondays!',)
},
a_minute={
'task': tostring.name,
'schedule': crontab(minute='*/1'), # 每分钟执行一次
'args': ('A minute',)
},
)
常规任务
delay()
:直接调用任务,是 apply_async()
的封装apply_async()
:通过发送异步消息调用任务,可指定倒计时 countdown ,执行时间 eta ,过期时间 expires 等参数signature()
:创建签名,可传递任务签名给别的进程使用,或作为其他函数的参数s()
:创建签名的快捷方式from wedo.tasks import mul
result = mul.delay(1, 2) # 直接调用
print(result.get())
result = mul.apply_async((1, 2), countdown=2) # 2s后执行
print(result.get())
t1 = mul.signature((1, 2), countdown=2) # 签名Signatures,可传递任务签名给别的进程使用,或作为其他函数的参数
result = t1.delay()
print(result.get())
t1 = mul.s(1, 2).set(countdown=2) # 创建签名的快捷方式
result = t1.delay()
print(result.get())
组合任务
group()
:组合,接受一个可并行调用的任务列表chain()
:串联,将签名连接在一起,一个接一个调用(前一个签名的结果作为下一个签名的第一个参数)chord()
:和弦,类似 group()
但包含回调,在所有任务执行完后再调用任务map()
:将参数列表应用于该任务starmap()
:将复合参数列表应用于该任务chunks()
:将一个很长的参数列表分块成若干部分from proj.tasks import add, mul, show
from celery import group, chain, chord
result = group(add.s(i, i) for i in range(5))() # 组合
print(result.get()) # [0, 2, 4, 6, 8]
result = chain(add.s(1, 2), add.s(3), mul.s(3))() # 串联
print(result.get()) # ((1+2)+3)*3=18
result = chord((add.s(i, i) for i in range(5)), show.s())() # 和弦
print(result.get()) # [0, 2, 4, 6, 8]
result = ~show.map(['Hello', 'World'])
print(result) # ['Hello', 'World']
result = ~add.starmap([(2, 2), (4, 4)])
print(result) # [4, 8]
res = add.chunks(zip(range(10), range(10)), 2)()
print(res.get()) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
详细查阅:
启动定时任务Beat
celery beat -A proj.period_task -l info
结果
项目结构
.
└─monitor
config.py # 配置文件
__init__.py # Celery实例化
tasks.py # 实时任务
main.py # 调用任务
config.py
BROKER_URL = 'redis://localhost:6379/0' # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 导入的任务模块
'monitor.tasks'
)
__init__.py
from celery import Celery, Task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__) # 日志
app = Celery('monitor') # 创建Celery实例
app.config_from_object('monitor.config') # 从配置文件中读取配置
class TaskMonitor(Task):
def on_success(self, retval, task_id, args, kwargs):
"""success时回调"""
logger.info('task id:{} , arg:{} , successful !'.format(task_id, args))
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""retry时回调"""
logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""failure时回调"""
logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))
tasks.py
from monitor import app, TaskMonitor
@app.task(base=TaskMonitor)
def success():
return 1
@app.task(bind=True, base=TaskMonitor)
def retry(self):
try:
raise Exception
except Exception as exc:
self.retry(exc=exc)
@app.task(base=TaskMonitor)
def failure():
raise Exception
main.py
from monitor.tasks import success, retry, failure
result = success.delay()
print(result.successful()) # 是否成功
print(result.ready()) # 是否执行完成
print(result.state) # 状态 PENDING -> STARTED -> SUCCESS/FAILURE
print()
result = retry.delay()
print(result.successful())
print(result.ready())
print(result.state)
print()
result = failure.delay()
print(result.successful())
print(result.ready())
print(result.state)
效果
tasks.py
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
imports=['tasks']
)
app.conf.timezone = 'Asia/Shanghai'
i = 0
@app.task
def show(a):
global i
i = i + 1
s = '{} {}'.format(a, i)
print(s)
if i >= 20:
i = 0
return
show.apply_async(args=(a,), countdown=i) # 递归
调用任务
from tasks import show
show.apply_async(args=('Hi',))
结果
可以直接通过代码配置而不用 Celery.config_from_object()
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
# imports=['tasks']
)
app.conf.imports = ['tasks']
详细查阅:Configuration and defaults
参数 | 含义 | 全称 |
---|---|---|
-A | 指定模块 | |
-l | 日志level | –loglevel |
-c | 进程数 | –concurrency |
-Q | 指定队列 | –queue |
-B | 周期性任务 | –beat |
-P | 池的实现 | –pool |
部署过程和单机启动一样,使用相同项目代码和启动命令。
实现原理是共享 Broker 队列。
flower
是一款 Celery 的监控工具
安装
pip install flower
启动
flower -A wedo --port=5555
Celery 版本大于4.4.7可能会报错
Celery不适合动态添加定时任务,但本人认为可以通过数据库+递归调用自身实现
最好用 APSchedule 实现,因为动态定时任务需要用到长时间的 countdown 或 eta,若这样的定时任务过多,会大量占用内存,导致重启和执行非延迟任务会很耗时。
并且使用Redis作为Broker并且异步任务执行时间延迟超过1小时,Celery会重复发布任务,导致任务重复执行
更好的解决办法是每天定时跑!!即实现from celery.schedules import crontab
celery.conf.beat_schedule.update
tasks.py
import sqlite3
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__) # 日志
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
imports=['tasks'],
)
app.conf.timezone = 'Asia/Shanghai'
conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks`
(
`id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
`name` TEXT,
`countdown` INTEGER
);
'''
c.execute(sql) # 创建数据库
def create(name='job', countdown=5):
"""创建定时任务"""
sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'
c.execute(sql, (name, countdown))
conn.commit()
return c.lastrowid
def read(id=None, verbose=False):
"""查询定时任务"""
sql = 'SELECT * FROM `tasks` '
if id:
sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)
all_rows = c.execute(sql).fetchall()
if verbose:
print(all_rows)
return all_rows
def update(id, countdown):
"""修改定时任务"""
sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'
c.execute(sql, (countdown, id))
conn.commit()
def delete(id, verbose=False):
"""删除定时任务"""
sql = 'DELETE FROM `tasks` WHERE `id`=?'
affected_rows = c.execute(sql, (id,)).rowcount
if verbose:
print('已删除{}行数据'.format(affected_rows))
conn.commit()
@app.task
def job(id):
# 读取定时任务数据
id = read(id)
if id:
id, name, countdown = id[0]
else:
logger.info('stop')
return
# 需要进行的任务
logger.warning('id={}'.format(id))
logger.warning('name={}'.format(name))
logger.warning('countdown={}'.format(countdown))
# 递归调用
job.apply_async(args=(id,), countdown=countdown)
main.py
from tasks import *
print('创建定时任务')
id = create(name='job', countdown=5)
job(id) # 立即运行
# job.apply_async((id,), countdown=5) # 5s后运行
print('查询定时任务:', read())
input('回车修改定时任务')
update(id, countdown=1)
input('回车删除定时任务')
delete(id, verbose=True)
启动
celery -A tasks worker --loglevel=info --pool=solo
效果
参数 eta
受时区影响,本人感觉较麻烦,可以直接使用 countdown
from datetime import datetime, timedelta
def next_weekday(weekday, d=datetime.now()):
""" 获取下周几日期
:param weekday: weekday取值1-7
:param d: 原日期,默认当前时间
:return: datetime.datetime
"""
delta = weekday - d.isoweekday()
if delta == 0:
delta = 7
return d + timedelta(delta)
d1 = datetime.now()
d2 = next_weekday(1, d1) # 下周一
delta = d2 - d1
countdown = delta.total_seconds()
print(countdown) # 604800.0
Redis默认不过期
长时间的countdown可能会过期,考虑 Redis 设为不过期或使用 RabbitMQ
redis-cli
KEYS *
TTL _kombu.binding.celery
使用Celery+Redis实现,异步执行任务,获取执行状态
1. 报错 ValueError: not enough values to unpack (expected 3, got 0)
启动 Celery 添加参数 --pool=solo
2. 报错 Cannot connect to redis://localhost:6379/0: Error 11002 connecting to localhost:6379. Lookup timed out
参考这篇文章
3. 重启Celery后任务丢失
task_reject_on_worker_lost = True
task_acks_late = True
or
celery.conf.CELERY_REJECT_ON_WORKER_LOST = True
celery.conf.CELERY_ACKS_LATE = True
4. 使用Redis作为Broker并且异步任务执行时间延迟超过1小时,Celery会重复发布任务,导致任务重复执行
将Celery升级到4.5以上并增大visibility_timeout的时间
默认visibility_timeout为3600秒即一小时
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
或者
app.conf.broker_transport_options = {'visibility_timeout': 43200}
还可以通过 MySQL 的唯一索引特性实现锁:在业务中添加锁,能插入就执行
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。