阅读:73
1.0、 基本介绍
要使用django-crontab插件,只需要下载一个django-crontab包就可以使用cron表达式在Django框架中设置定时任务。这种方法不支持windows系统,功能也相对简单。
1.1、安装插件
pip install django-crontab
1.2、注册app
(1)在settings.py中INSTALLED_APPS引入app,完成子应用注册。
INSTALLED_APPS = [
...
'django_crontab'
]
(2)在settings.py中配置定时任务,在settings.py最后增加一下代码:
# 定时任务
'''
* * * * * :分别表示 分(0-59)、时(0-23)、天(1 - 31)、月(1 - 12) 、周(星期中星期几 (0 - 7) (0 7 均为周天))
crontab范例:
每五分钟执行 */5 * * * *
每小时执行 0 * * * *
每天执行 0 0 * * *
每周一执行 0 0 * * 1
每月执行 0 0 1 * *
每天23点执行 0 23 * * *
'''
CRONJOBS = [
('*/1 * * * *', 'base.crontabs.confdict_handle', ' >> /tmp/logs/confdict_handle.log'), # 注意:/tmp/base_api 目录要手动创建
]
或者:
CRONJOBS = [
('*/5 * * * *', 'appname.cron.test','>>/home/test.log')
]
'''
‘/5 * * *’ 遵循的是crontab 语法。
‘appname.cron.test’,这个appname就是你开发时加入到settings中的那个。因为你的cron.py文件就在这个下面,否则找不到路径。cron 就是你自己起的任务文件的名字。test就是执行的函数中的内容。
‘>>/home/test.log’,通常会输出信息到一个文件中,就使用这个方法,注意的是‘>>’表示追加写入,’>’表示覆盖写入。
'''
* * * * * command
分钟(0-59) 小时(0-23) 每个月的哪一天(1-31) 月份(1-12) 周几(0-6) shell脚本或者命令
核心语法:
CRONJOBS = [('*/5 * * * *', '任务路径.任务函数名','>>/home/book.log')]
参数说明:
‘*/5 * * *’ 表示五分钟一次,而django-crontab是调用Linux的crontab.
执行时间或者周期
【时间的顺序为分->时->天->月->周
】需要定时执行的函数(路径+函数名)
输出log信息的路径+log文件
常见的参数:
"*"表示可选的所有取值范围内的数字
;"/"表示'每',比如若第一个参数为/5,就是五分钟一次
,例如:*/5就是每5个单位- 代表从某个数字到某个数字
, 分开几个离散的数字
\quad应用示例:
- 每两个小时
0 */2 * * *
- 晚上11点到早上8点之间每两个小时,早上8点
0 23-7,8 * * *
- 每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点
0 11 4 * 1-3
- 1月1日早上4点
0 4 1 1 *
\quad有兴趣的小伙伴可以深入研究下 Linux 的crontab定时任务。参考链接:django 定时任务 django-crontab 的使用
在执行脚本中:
0 6 * * * commands >> /tmp/test.log # 每天早上6点执行, 并将信息追加到test.log中
0 */2 * * * commands # 每隔2小时执行一次
1.3、编写定时任务方法
在本例中是在apps/base/crontabs.py
(一般,执行add new file under appname, named cron.py)中增加的定时任务
from .models import ConfDict # base内的一个model,定时任务多数用来操作数据库,因此给一个示例
import datetime
# 定时任务
def confdict_handle():
try:
objs = CondDict.objects.all()
print(obj)
loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('本地时间:'+str(loca_time))
except Exception as e:
print('发生错误,错误信息为:', e)
1.4、如何使用&运行
(1)开启定时器
python manage.py crontab add
(2)将任务添加并生效(查看开启的定时器)
python manage.py crontab show
python manage.py crontab remove
(3)重启django服务,执行
corntab -e
(4)查看定时任务
crontab -l
此时,应该是可以看到系统中创建了该定时任务。
1.5、django-crontab插件优缺点:
2.0、基本介绍
django-apscheduler支持三种调度任务:固定时间间隔,固定时间点(日期),Crontab 命令。同时,它还支持异步执行、后台执行调度任务 配置简单、功能齐全、使用灵活、支持windows和linux,适合中小型项目。django-apscheduler中相关的概念和python的定时任务框架apscheduler中的概念是一样的
APScheduler的使用场景?
redis持久化存储时,使用APScheduler,使数据同步。
用户下单后使用,规定30min内必须支付,否则取消订单。
APScheduler 与 crontab 同为定时任务工具,有什么区别?
(1)crontab:
比如上面那个订单支付问题,crontab不知道要什么时候执行,所以它做不到
。(2)APScheduler:
2.1、安装插件
pip install django-apscheduler
或者
pip install apscheduler
2.2、使用插件
修改settings.py增加以下代码:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_apscheduler', # 新加入的定时任务插件django-apscheduler
'UserManger.apps.UsermangerConfig',
]
2.3、迁移数据库
因为django-apscheduler会创建表来存储定时任务的一些信息,所以将app加入之后需要迁移数据
python manage.py migrate
去数据库中看一看,生成了两个表格,大部分都顾名思义。
- django_apscheduler_djangojob——用于存储任务的表格
- django_apscheduler_djangojobexecution——用于存储任务执行状态的表格
参数说明:
- status: 执行状态
- duration: 执行了多长时间
- exception: 是否出现了什么异常
\quadNote:
这两个表用来管理你所需要的定时任务,然后就开始在任一view.py下写你需要实现的任务:
2.4、完整示例 在views.py中增加你的定时任务代码
注意:如果在其他文件中添加代码是没有效果的
from apscheduler.schedulers.background import BackgroundScheduler # 使用它可以使你的定时任务在后台运行
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
import time
'''
date:在您希望在某个特定时间仅运行一次作业时使用
interval:当您要以固定的时间间隔运行作业时使用
cron:以crontab的方式运行定时任务
minutes:设置以分钟为单位的定时器
seconds:设置以秒为单位的定时器
'''
try:
scheduler = BackgroundScheduler() # 创建定时任务的调度器对象——实例化调度器
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 'cron'方式循环,周一到周五,每天9:30:10执行,id为工作ID作为标记
# ('scheduler',"interval", seconds=1) #用interval方式循环,每一秒执行一次
@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')
#@register_job(scheduler, "interval", seconds=5)
def my_job(param1, param2): # 定义定时任务
# 定时每5秒执行一次
#t_now = time.localtime()
print(time.strftime('%Y-%m-%d %H:%M:%S'))
# 监控任务
register_events(scheduler)
# 向调度器中添加定时任务
scheduler.add_job(my_job, 'date', args=[100, 'python'])
# 启动定时任务调度器工作——调度器开始
scheduler.start()
except Exception as e:
print('定时任务异常:%s' % str(e))
2.6、如何使用&运行
apscheduler定时任务会跟随django项目一起运行因此直接启动django即可。
python manage.py runserver
2.7、django-apscheduler插件优缺点:
创建任务:
\quad
有两种创建任务的方法:装饰器和add_job函数
。
1. 装饰器
在任意view.py中实现代码
(我习惯新开一个app专门实现定时任务
):
典型范例:
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events
print('django-apscheduler')
def job2(name):
# 具体要执行的代码
print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))
# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
# 每天8点半执行这个任务
#@register_job(scheduler, 'cron', id='test', hour=8, minute=30,args=['test'])
def job1(name):
# 具体要执行的代码
print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))
scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")
# 监控任务——注册定时任务
register_events(scheduler)
# 调度器开始运行
scheduler.start()
启动服务 python manage.py runserver
这个任务就会被存储到django_apscheduler_djangojob表
中,并按照设置定时的执行程序。
装饰器@register_job()参数说明:
还有些其他的参数感兴趣的同学可以查看源代码来了解。
2. add_job函数
装饰器的方法适合于写代码的人自己创建任务,如果想让用户通过页面输入参数,并提交来手动创建定时任务,就需要使用add_job函数
。
下面这个小例子,前端传递json数据给后端,触发test_add_task函数
,来添加任务:
import json
from django.http import JsonResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')
# 与前端的接口
def test_add_task(request):
if request.method == 'POST':
content = json.loads(request.body.decode()) # 接收参数
try:
start_time = content['start_time'] # 用户输入的任务开始时间, '10:00:00'
start_time = start_time.split(':')
hour = int(start_time)[0]
minute = int(start_time)[1]
second = int(start_time)[2]
s = content['s'] # 接收执行任务的各种参数
# 创建任务
scheduler.add_job(test, 'cron', hour=hour, minute=minute, second=second, args=[s])
code = '200'
message = 'success'
except Exception as e:
code = '400'
message = e
back = {
'code': code,
'message': message
}
return JsonResponse(json.dumps(data, ensure_ascii=False), safe=False)
# 具体要执行的代码
def test(s):
pass
register_events(scheduler)
scheduler.start()
这样就可以由前端用户来手动设置定时任务了。
add_job函数参数说明:
和装饰器的参数大同小异,只是第一个参数不同。
如果具体要执行的函数和调用它的函数在一个文件中,那么只需要传递这个函数名就可以了(如上面的例子)。
但是我习惯将具体的业务代码写到另外一个文件中,view.py中只写前后端交互的接口函数,这种情况下传递的参数为一个字符串,格式为: ‘package.module:some.object’,即 包名.模块:函数名
参考链接:详解django-apscheduler的使用方法
基础组件:
\quad
APScheduler 有四种组件
,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)
。
- schedulers(调度器)
它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。- triggers(触发器)
描述调度任务被触发的条件。不过触发器完全是无状态的。- job stores(作业存储器)
任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中。- executors(执行器)
负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
schedulers(调度器):
它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:
triggers(触发器):
它提供 3种内建的 trigger:
参数 | 说明 |
---|---|
run_date (datetime 或 str) | 作业的运行日期或时间 |
timezone (datetime.tzinfo 或 str) | 指定时区 |
典型范例1:
# 在 2017-12-13 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2020-12-13 14:00:01 时刻运行一次 job_func 方法
scheduler.add_job(job3,"date",run_date='2020-12-13 14:00:01',args=['王飞'],id="job3")
典型范例2:
# 每天0点执行函数的代码,0点的话,hour可以不用写
app.scheduler.add_job(函数名, "cron", hour=0, args=[函数需要传的参数])
#每天凌晨3点执行代码
app.scheduler.add_job(函数名, "cron", hour=3, args=[app])
#如果date后面没有参数的话,就是立刻执行代码,一般测试的时候用
app.scheduler.add_job(函数名, "date", args=[app])
参数 | 说明 |
---|---|
weeks (int) | 间隔几周 |
days (int) | 间隔几天 |
hours (int) | 间隔几小时 |
minutes (int) | 间隔几分钟 |
seconds (int) | 间隔多少秒 |
start_date (datetime 或 str) | 开始日期 |
end_date (datetime 或 str) | 结束日期 |
timezone (datetime.tzinfo 或str) | 时区 |
典型范例:
# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
参数 | 说明 |
---|---|
year (int 或 str) | 年,4位数字 |
month (int 或 str) | 月 (范围1-12) |
day (int 或 str) | 日 (范围1-31 |
week (int 或 str) | 周 (范围1-53) |
day_of_week (int 或 str) | 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 时 (范围0-23) |
minute (int 或 str) | 分 (范围0-59) |
second (int 或 str) | 秒 (范围0-59) |
start_date (datetime 或 str) | 最早开始日期(包含) |
end_date (datetime 或 str) | 最晚结束时间(包含) |
timezone (datetime.tzinfo 或str) | 指定时区 |
这些参数是支持算数表达式,取值格式有如下:
典型范例:
# 在每天的2点35分36分37分 执行 job_func 任务
scheduler.add_job(job4,"cron",hour='2', minute='35-37',args=['王涛'],id="job4")
作业存储(job store):
\quad
有两种添加方法,其中一种是add_job()
, 另一种则是@register_job()修饰器
来修饰函数。
\quad
这个两种办法的区别是:第一种方法返回一个 Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。
典型范例:
@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')·
def test_job():
t_now = time.localtime()
print(t_now)
其他功能:
django-apscheduler框架还提供了很多操作定时任务的函数。比如:
scheduler.remove_job(job_name)
scheduler.pause_job(job_name)
scheduler.resume_job(job_name)
scheduler.get_jobs()
scheduler.modify_job(job_name)
注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。
可以在页面上做一个这样的表格,再加上简单的前后端交互就可以让用户自行管理定时任务:
其他的还有一些辅助功能(包括显示所有任务,显示任务的执行时间等),同学们可以自行查看。
scheduler.reschedule_job(job_name)
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)
执行器(executor):
\quad
执行调度任务的模块。最常用的 executor 是 ThreadPoolExecutor,存储路径是在Django数据库中。
执行器 executors 可以使用线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers)
ThreadPoolExecutor(20) # 最多20个线程同时执行
使用方法:
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BackgroundScheduler(executors=executors)
from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
ProcessPoolExecutor(5) # 最多5个进程同时执行
使用方法:
executors = {
'default': ProcessPoolExecutor(3)
}
scheduler = BackgroundScheduler(executors=executors)
参考链接:定时任务APScheduler,随时可以保持数据同步
总结:
django-apscheduler使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,并且作业会存储在数据库中,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。django-apscheduler可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。需要注意的是,apscheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行。
对job的操作:
移除job:
参考链接:Django高级特性:django-apscheduler定时任务
【以下内容未经验证,选择性参考】:摘自定时任务APScheduler,随时可以保持数据同步
监控任务:使用django_apscheduler.jobstores
提供的register_events
监控任务:register_events(scheduler)
程序运行(开启调度器):
scheduler.start()
scheduler.shutdown()
参考范例【未验证】01:
在任意一个app内的views.py中写好定时任务
from apscheduler.scheduler import Scheduler
from time import sleep
def task_Fun():
'''
这里写定时任务
'''
sleep(1)
sched = Scheduler()
@sched.interval_schedule(seconds=6)
def my_task1():
print('定时任务1开始\n')
task_Fun()
print('定时任务1结束\n')
@sched.interval_schedule(hours=4)
def my_task2():
print('定时任务2开始\n')
sleep(1)
print('定时任务2结束\n')
sched.start()
ok。启动django 项目,定时任务就会在你设定的时间执行了
参考链接:Django使用apscheduler完成定时任务
参考范例【未验证】02:
from django.utils import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
#异步式的
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=timezone.get_current_timezone())
#阻塞的,适用于scheduler是独立服务的场景。
#scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
#scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
def myjob():
pass
try:
scheduler.start()
# 5s后执行myjob
# 传入时间去除毫秒
deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5)
scheduler.add_job(myjob, 'date', run_date=deadline)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
参考链接:django apscheduler在特定时间执行一次任务(run at a specify time only once)
3.0、基本介绍
Celery为分布式任务队列
。侧重实时操作,可用于生产系统处理数以百万计的任务,都用于大型项目,配置和使用较为复杂。Django的分布式主要由Celery框架实现,这是python开发的分布式任务队列。由于它本身不支持消息存储服务,所以需要第三方消息服务来传递任务,一般使用Redis。
3.1、安装依赖
本例建立在认为你已经知道如何使用Celery实现异步任务的基础上,需要学习的请移步 Django使用Celery
本例使用redis作为Borker和backend
pip install -U "celery[redis]"
3.2、使用插件
修改settings.py 增加以下代码
....
# Celery配置
from kombu import Exchange, Queue
# 设置任务接受的类型,默认是{'json'}
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置task任务序列列化为json
CELERY_TASK_SERIALIZER = 'json'
# 请任务接受后存储时的类型
CELERY_RESULT_SERIALIZER = 'json'
# 时间格式化为中国时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC时间
CELERY_ENABLE_UTC = False
# 指定borker为redis 如果指定rabbitmq CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 指定存储结果的地方,支持使用rpc、数据库、redis等等,具体可参考文档 # CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo' # mysql 作为后端数据库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 设置任务过期时间 默认是一天,为None或0 表示永不过期
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 设置worker并发数,默认是cpu核心数
# CELERYD_CONCURRENCY = 12
# 设置每个worker最大任务数
CELERYD_MAX_TASKS_PER_CHILD = 100
# 指定任务的位置
CELERY_IMPORTS = (
'base.tasks',
)
# 使用beat启动Celery定时任务
# schedule时间的具体设定参考:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
CELERYBEAT_SCHEDULE = {
'add-every-10-seconds': {
'task': 'base.tasks.cheduler_task',
'schedule': 10,
'args': ('hello', )
},
}
...
3.3、编写定时任务代码
在本例中是在apps/base/tasks.py中增加的定时任务
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
@shared_task
def cheduler_task(name):
print(name)
print(time.strftime('%X'))
3.4、如何使用&运行
启动定时任务beat
celery -A dase_django_api beat -l info
启动Celery worker 用来执行定时任务
celery -A dase_django_api worker -l -l info
注意: 这里的 dase_django_api 要换成你的Celery APP的名称
3.5、Celery插件的优缺点:
4.1、创建定时任务
在apps/base下创建一个文件名为schedules.py;键入一下内容
import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict
def confdict_handle():
while True:
try:
loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('本地时间:'+str(loca_time))
time.sleep(10)
except Exception as e:
print('发生错误,错误信息为:', e)
continue
def main():
'''
主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
'''
try:
# 启动定时任务,多个任务时,使用多线程
task1 = threading.Thread(target=confdict_handle)
task1.start()
except Exception as e:
print('发生异常:%s' % str(e))
if __name__ == '__main__':
main()
4.2、如何使用&运行
直接运行脚本即可
python apps/base/schedules.py
4.3、自建代码的优缺点:
优点:
自定义
高度自由
缺点:
过于简单
当任务过多时,占用资源也会增加
参考链接:Django用定时任务比较不同姿势,使用,的,多种,对比
Crontab和APScheduler
django 定时任务 django-crontab & APScheduler 使用
Python 定时任务(schedule, Apscheduler, celery, python-crontab)-爱代码爱编程
定时任务, linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文两种方式实现定时任务。可以直接参考目录三Django中使用django-apscheduler,目录二内容测试已通过,相关数据库配置暂时无空尝试。
APscheduler全称Advanced Python Scheduler :作用为在指定的时间规则执行指定的作业。
apscheduler的四大组件,分别是Triggers,Job stores,Executors,Schedulers
Scheduler添加job流程:
Scheduler调度流程:
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
作业(任务)存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
注意:
一个任务储存器不要共享给多个调度器,否则会导致状态混乱
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。
【调度器工作流程
】:
调度器组件详解:
根据开发需求选择相应的组件,下面是不同的调度器组件:
BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。
BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。
GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
TornadoScheduler Tornado调度器,适用于构建Tornado应用。
TwistedScheduler Twisted调度器,适用于构建Twisted应用。
QtScheduler Qt调度器,适用于构建Qt应用。
(1)任务储存器的选择:
要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器MemoryJobStore就可以应付。但是,如果你需要在程序关闭或重启时,保存任务的状态,那么就要选择持久化的任务储存器。如果,作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合与保护功能。
(2)执行器的选择:
同样要看你的实际需求。默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。
配置一个任务,就要设置一个任务触发器。触发器可以设定任务运行的周期、次数和时间。
(3)APScheduler有三种内置的触发器:
date 日期:触发任务运行的具体日期
interval 间隔:触发任务运行的时间间隔
cron 周期:触发任务运行的周期
calendarinterval:当您想要在一天中的特定时间以日历为基础的间隔运行任务时使用
一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。
触发器代码示例:
date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
【参数说明】
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
def my_job(text):
print(text)
# 在2019年4月15日执行
scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['测试任务'])
scheduler.start()
###########################################################################################
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
def my_job(text):
print(text)
# datetime类型(用于精确时间)
scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['测试任务'])
scheduler.start()
注意:run_date参数可以是date类型、datetime类型或文本类型。
scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['测试任务'])
【更多示例代码,参考链接】:python 定时任务APScheduler 使用介绍
【针对上述4个模块的详细介绍,参考博文】Python定时库APScheduler&Django使用django-apscheduler实现定时任务
本文开发内容
作为测试平台而言,定时任务算是必备要素了,只有跑起来的自动化,才能算是真正的自动化。本文将给测试计划添加定时任务功能,具体如下:
前端效果图:
1.为什么要做页面静态化
2.什么是页面静态化
1.首页页面静态化实现步骤
2.首页页面静态化实现
import os
import time
from django.conf import settings
from django.template import loader
from apps.contents.models import ContentCategory
from apps.contents.utils import get_categories
def generate_static_index_html():
"""
生成静态的主页html文件
"""
print('%s: generate_static_index_html' % time.ctime())
# 获取商品频道和分类
categories = get_categories()
# 广告内容
contents = {}
content_categories = ContentCategory.objects.all()
for cat in content_categories:
contents[cat.key] = cat.content_set.filter(status=True).order_by('sequence')
# 渲染模板
context = {
'categories': categories,
'contents': contents
}
# 获取首页模板文件
template = loader.get_template('index.html')
# 渲染首页html字符串
html_text = template.render(context)
# 将首页html字符串写入到指定目录,命名'index.html'
file_path = os.path.join(settings.STATICFILES_DIRS[0], 'index.html')
with open(file_path, 'w', encoding='utf-8') as f:
f.write(html_text)
对于首页的静态化,考虑到页面的数据可能由多名运营人员维护,并且经常变动,所以将其做成定时任务,即定时执行静态化。
在Django执行定时任务,可以通过 django-crontab扩展来实现。
参考链接:定时器任务django-crontab的使用【静态化高频率页面,增加用户体验】【系统的定时器,独立于项目执行】【刘新宇】