追逐繁星的孩子

お帰りなさい

首页 标签 归档 分类 关于
Python定时任务库Schedule详解
日期 2017-08-02   |    标签 Python   |    评论

最近在做数据同步方面事情,其中需要通过python定时执行一些数据同步任务,github到schedule这个库。其在github上面有3000+的stars,于是乎看了源码并且研究了一下,感觉蛮有意思的。所以为了下次能更好的使用它,闲来没事整理了一下自己的见解,也算是加深了对这个模块的印象。这里的一些介绍都是以代码注释的方式给出。

schedule的使用其实蛮简单的,作者在文档中给出了下面的demo。

import schedule  #---------------------------------1
import time

def job(message='stuff'):   #----------------------2
    print("I'm working on:", message) 

schedule.every(10).minutes.do(job)   #-------------3
schedule.every().hour.do(job, message='things')
schedule.every().day.at("10:30").do(job)

while True:
    schedule.run_pending()    #-----------------4
    time.sleep(1)
  1. 导入schedule库
  2. 定义需要定时执行的方法
  3. 根据自己的需求选择不同的定时方案(按天?按小时?按分钟?等等等等)
  4. 循环执行schedule.run_pending(),间隔为1秒

现在让我们来看看它的实现吧!

首先是schedule内部定义的三个类:CancelJob、Scheduler、Job。主要介绍Scheduler和Job。

Scheduler(定时类)用来调度Job

# 初始化schedule类时初始化jobs列表,用于存放Job。
# 由于我们可能需要实现不只一个任务的定时执行,故这边使用列表来存放批量Job。
def __init__(self):
    self.jobs = []

# 删选出当前需要执行的任务(可能有多个)并执行
# 这边解释下为什么需要sorted排序。因为任务的执行是线性的,当一个任务再执行时间长的时候,
# 会产生一个情况,即它执行的时候,后续由多个任务同时满足执行条件,但是由于当前任务还在执行,
# 所以他们只能等待,等到前面的任务执行完毕,后面的任务就可以去执行了,那么其中就会有先来后到,
# 这里为了保证先到的任务先执行,所以要排下序
def run_pending(self):
    runnable_jobs = (job for job in self.jobs if job.should_run)
    for job in sorted(runnable_jobs):
        self._run_job(job)

# 直接执行任务列表中的所有任务
def run_all(self, delay_seconds=0):
    for job in self.jobs[:]:
        self._run_job(job)
        time.sleep(delay_seconds)

# 根据tag条件清空jobs列表,若未定义tag,则全部清空
def clear(self, tag=None):
    if tag is None:
        del self.jobs[:]
    else:
        self.jobs[:] = (job for job in self.jobs if tag not in job.tags)

# 移除不需要定时执行的任务
def cancel_job(self, job):
    try:
        self.jobs.remove(job)
    except ValueError:
        pass

# 配置一个job 具体看job类
def every(self, interval=1):
    job = Job(interval, self)
    return job

# 调用job.run()执行任务。这里if后面的我觉得并没有什么卵用,无法cancel job。
# 不过你可以自己实现cancel job
# def _run_job(self, job):
# try:
#   job.run()
# except:
#   self.cancel_job(job)
# 比如像上面那样
def _run_job(self, job):
    ret = job.run()
    if isinstance(ret, CancelJob) or ret is CancelJob:
        self.cancel_job(job)

# 获取下次待执行的任务(下次执行时间距离当前时间最近的任务)的执行时间点
@property
def next_run(self):
    if not self.jobs:
        return None
    return min(self.jobs).next_run

# 根据next_run()获取下次任务执行还需要的秒数
@property
def idle_seconds(self):
    return (self.next_run - datetime.datetime.now()).total_seconds()

Job(任务类)用来定义任务执行相关参数

以下只显示我认为必要的方法

def __init__(self, interval, scheduler=None):
    self.interval = interval      # 定义任务执行的时间间隔
    self.job_func = None          # 定时执行的方法名
    self.unit = None              # 定义间隔的度(按天?按小时?按分钟?等等)
    self.at_time = None           # 定时执行的时间点
    self.last_run = None          # 上次执行的时间
    self.next_run = None          # 下次执行的时间
    self.period = None            # 执行周期(用于时间差值计算)
    self.start_day = None         # 定义一周中某一天执行
    self.tags = set()             # tag集合 
    self.scheduler = scheduler    # Scheduler

# 用于Job类型比较(根据下次执行时间)
def __lt__(self, other):
    return self.next_run < other.next_run   

# 定义这个属性代表间隔度为秒
@property
def second(self):
    assert self.interval == 1, 'Use seconds instead of second'
    return self.seconds

# 返回Job方便后续调用
@property
def seconds(self):
    self.unit = 'seconds'
    return self

# 后续是分钟、小时、天等等,不多举例
#......

def tag(self, *tags):
    if any([not isinstance(tag, collections.Hashable) for tag in tags]):
        raise TypeError('Every tag should be hashable')
    if not all(isinstance(tag, collections.Hashable) for tag in tags):
        raise TypeError('Tags must be hashable')
    self.tags.update(tags)
    return self

# 这里只能定义小时和分钟,不能精确到秒
def at(self, time_str):
    assert self.unit in ('days', 'hours') or self.start_day
    hour, minute = time_str.split(':')
    minute = int(minute)
    if self.unit == 'days' or self.start_day:
        hour = int(hour)
        assert 0 <= hour <= 23
    elif self.unit == 'hours':
        hour = 0
    assert 0 <= minute <= 59
    self.at_time = datetime.time(hour, minute)
    return self

# 这步之前加上这步已经把Job的参数配置完毕,并添加到Scheduler的Job列表中等待执行
# 这边用到了functools的方法。主要为了调用方法更简洁一点。
def do(self, job_func, *args, **kwargs):
    self.job_func = functools.partial(job_func, *args, **kwargs)
    try:
        functools.update_wrapper(self.job_func, job_func)
    except AttributeError:
        pass
    self._schedule_next_run()
    self.scheduler.jobs.append(self)
    return self

# 判断是否符合执行条件
@property
def should_run(self):
    return datetime.datetime.now() >= self.next_run

# 任务执行,记录执行时间,并记录下次执行时间
def run(self):
    ret = self.job_func()
    self.last_run = datetime.datetime.now()
    self._schedule_next_run()
    return ret

# 这边是计算下次执行时间的逻辑,代码略多,这边就不展开了。
def _schedule_next_run(self):

模块方法

# 模块内部创建可调用类 
default_scheduler = Scheduler()
jobs = default_scheduler.jobs

def every(interval=1):
    return default_scheduler.every(interval)

def run_pending():
    default_scheduler.run_pending()

def run_all(delay_seconds=0):
    default_scheduler.run_all(delay_seconds=delay_seconds)

def clear(tag=None):
    default_scheduler.clear(tag)

def cancel_job(job):
    default_scheduler.cancel_job(job)

def next_run():
    return default_scheduler.next_run

def idle_seconds():
    return default_scheduler.idle_seconds

这里作者为我们新建了对象,实现了调用的方法。使得模块的使用更加简单,代码更加简洁。

注意:

  1. 多个任务的执行是线性的,比如A,B两个任务同时满足执行要求,如果A需要30秒执行完,那么B就只能等待A执行完之后执行。
  2. 其中一个任务出错,整个定时任务报错退出,所以务必自己处理需要定时执行的任务的异常。如果不能保证的话,还是把多个任务分多个定时任务跑吧。