赞
踩
定时器是一种非常有用的工具,用于执行特定任务或函数,例如定时任务、轮询操作、定时检查等。Python提供了多种方式来创建和使用定时器,本文将介绍一些高级的定时器使用方式,包括使用threading模块、使用schedule库以及在异步编程中使用定时器。
这是最基础的计时方法,通过让程序暂停一定时间来实现简单的定时任务
#!/usr/bin/env python
# coding=utf-8
import time
def task():
print("任务执行中...")
interval = 5 # 间隔5秒
while True:
task()
time.sleep(interval)
threading.Timer是Python标准库threading中的一个类,用于在指定时间后启动一个任务。它主要用于简单的延迟任务执行,适合于不需要复杂调度逻辑的场景。其使用相对简单,但在处理复杂的调度任务时可能会显得力不从心。
常用场景
延迟执行任务:在需要任务延迟一段时间执行时,如启动延迟、简单重试机制等。
单次定时任务:适合于只需要执行一次的定时任务
import threading
def my_task():
print("My Task Executing")
timer = threading.Timer(10, my_task) # 10秒后执行my_task
timer.start()
timer.cancel() #取消定时器: 如果定时器尚未执行,可以使用cancel()方法取消
sched模块是Python自带的一个事件调度模块,它提供了一个通用的方法来定时运行任务。
3.1 enter
enter(delay, priority, action, argument=(), kwargs={})
这个方法用于调度一个事件。事件将会在指定的延迟时间delay后被执行。priority参数用于同一时间到达的两个事件进行排序:具有较小priority值的事件会被先执行。action参数则是当事件被执行时会调用的函数,argument是传递给action的参数组成的元组,kwargs是传递给action的关键字参数组成的字典。
调用enter方法会返回一个事件对象,这个对象可以用于后续取消该事件。
3.2 cancel(evt)
cancel(event)
此方法用于取消一个已经被安排的事件。要取消一个事件,你需要传递给cancel方法之前调用enter方法时返回的事件对象。一旦取消,该事件就不会执行。
如果尝试取消一个不存在或已经执行的事件,则会抛出一个ValueError异常。
3.3 empty
empty()
这个方法用于检查调度器队列中是否有待执行的事件。如果事件队列为空,它返回True,否则返回False。
没有事件等待执行通常意味着调度器的run方法将立即返回,因为它没有任何工作可做。
#!/usr/bin/env python # coding=utf-8 """ # @Time : 2024/4/26 # @Author : Summer # @File : test # @describe: """ import sched import time from datetime import datetime # 初始化sched模块的scheduler类 schedule = sched.scheduler(time.time, time.sleep) def task(inc): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") print("Task executed at:", ts) # 这里安排下一次执行的任务 evt = schedule.enter(inc, 0, task, (inc,)) # 假设我们在这里确定了需要取消未来某个计划任务的条件 # 这里仅为示例,实际情况下应依据具体逻辑 if datetime.now().second % 10 == 0: # 假设当当前秒数是10的倍数时取消后续任务 print("Cancelling the future task.") schedule.cancel(evt) # 取消未来的任务 def func(inc=5): # 调度第一次执行任务 schedule.enter(0, 0, task, (inc,)) schedule.run() func() # 示例结束后,检查调度器队列是否为空 if schedule.empty(): print("Scheduler queue is empty. No more tasks to run.")
在Python3.4以上版本中,可以使用asyncio实现异步的定时器。
#!/usr/bin/env python # coding=utf-8 """ # @Time : 2024/4/26 # @Author : Summer # @File : test # @describe: """ import asyncio async def task(): print("任务执行中...") async def main(): while True: await task() await asyncio.sleep(5) asyncio.run(main())
常用场景
分布式系统中的异步任务处理:例如,在Web应用中异步发送电子邮件、生成报表等。
周期性任务调度:利用Celery Beat,可以很方便地实现周期性任务的调度,如每天清理数据库、定时抓取数据等。
执行周期性任务(结合Celery Beat):
#!/usr/bin/env python # coding=utf-8 """ # @Time : 2024/4/26 # @Author : Summer # @File : test # @describe: """ from celery import Celery from datetime import timedelta app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def my_task(arg): print(arg) app.conf.beat_schedule = { 'execute-my-task-every-10-seconds': { 'task': 'my_task', 'schedule': timedelta(seconds=10), 'args': ("Periodic task executed",) }, }
主要用于测量小段代码的执行时间,虽然不是用来作为定时器,但在某些情况下可以用来作为计时工具
#!/usr/bin/env python # coding=utf-8 """ # @Time : 2024/4/26 # @Author : Summer # @File : test # @describe: """ import timeit def task(): print("任务执行中...") # timeit默认执行1000000次,这里设置number=1做演示 timeit.timeit(task, number=1)
APScheduler是一个Python库,用于开发复杂的任务调度。与threading.Timer相比,APScheduler提供了丰富的功能,如任务持久化、作业存储选择、多种触发条件等,更适合于需要复杂调度策略的应用。
常用场景
周期性任务调度:例如,定时进行数据库的备份、定期清理日志文件等。
动态任务调度:可以根据运行时的情况动态地添加、修改或删除任务。
多种调度策略:支持定点、定时、间隔、cron等多种调度方式。
常用方法
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
def my_task():
print("My Task Executing")
# 按间隔时间执行
scheduler.add_job(my_task, 'interval', minutes=10)
# 使用cron表达式执行
scheduler.add_job(my_task, 'cron', day_of_week='mon-fri', hour=6)
scheduler.start() # 启动调度器
# 如需关闭调度器
scheduler.shutdown()
job = scheduler.add_job(my_task, 'interval', hours=2)
job.pause() # 暂停作业
job.resume() # 恢复作业
job.remove() # 移除作业
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。