Celerybeat任务重复执行

前言

celery有时任务会重复执行,这时就需要一步一步排查。任务重复首先排除以下几项:

  1. 启动多个celery beat
  2. 任务时间过长,超过了visibility_timeout

celerybeat重复任务

复现:某个任务每日10点执行,现时是11点,这个任务今日应已执行,last_run_at=今日10点,现在将该任务执行时间改为每日10:30,然后重启,这时beat会立即发送该任务给worker执行。

其实这也不算是重复执行,把任务执行时间重新设置在上次执行时间和现在之间,celery beat会在重启后立即执行

源码分析

beat启动流程

beat启动: celery beat -A app –schedule=celerybeat-schedule -l info
启动流程:

  1. Beat类中调用EmbeddedService,采用多进程还是多线程的方式
  2. 开启进程或线程,调用Service类
  3. Service类start死循环,每interval同步schedule

具体的参考

beat重复执行

beat启动需要一个Schedule,它的作用是定时任务的持久化,默认是使用shelve库,写在celerybeat-schedule文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# celerybeat-schedule
('__version__', '3.1.25')
('entries', {'task': <Entry: task task() <crontab: 0 1 * * * (m/h/d/dM/MY)>)
('tz', 'Asia/Shanghai')
('utc_enabled', True)

# 单个entry详情,里面记录了上次执行的时间和总共执行次数
$ docs['entries']['task'].__dict__
{'app': None,
'args': (),
'kwargs': {},
'last_run_at': datetime.datetime(2019, 7, 11, 0, 0, 0, 78524),
'name': 'task',
'options': {},
'schedule': <crontab: 0 1 * * * (m/h/d/dM/MY)>,
'task': 'task',
'total_run_count': 150}

它每次重启时,会加载上次的文件然后新的配置合并,相关代码

1
2
3
4
5
6
7
8
9
10
11
def merge_inplace(self, b):
schedule = self.schedule
A, B = set(schedule), set(b)
for key in A ^ B:
schedule.pop(key, None)
for key in B:
entry = self.Entry(**dict(b[key], name=key, app=self.app))
if schedule.get(key):
schedule[key].update(entry)
else:
schedule[key] = entry

当我们的任务时间重新设置在last_run_at和now之间,看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def maybe_due(self, entry, publisher=None):
is_due, next_time_to_run = entry.is_due()

if is_due: # 当is_due 为true时,就执行任务
info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
try:
result = self.apply_async(entry, publisher=publisher)
except Exception as exc:
error('Message Error: %s\n%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id->%s', entry.task, result.id)
return next_time_to_run

def is_due(self, last_run_at):
"""根据last_run_at 计算剩余的时间"""
rem_delta = self.remaining_estimate(last_run_at)
rem = timedelta_seconds(rem_delta)
due = rem == 0
if due:
rem_delta = self.remaining_estimate(self.now())
rem = timedelta_seconds(rem_delta)
return schedstate(due, rem)

def remaining(start, ends_in, now=None, relative=False):
"""根据上次执行时间,下次执行剩余时间,now计算剩余时间"""
now = now or datetime.utcnow()
end_date = start + ends_in
if relative:
end_date = delta_resolution(end_date, ends_in)
ret = end_date - now
if C_REMDEBUG: # pragma: no cover
print('rem: NOW:%r START:%r ENDS_IN:%r END_DATE:%s REM:%s' % (
now, start, ends_in, end_date, ret))
return ret

代码就那样,我还是直接说吧,原理就是celery计算下次执行时间,是根据last_run_at开始计算的,不是从现在。
解释那个复现的例子,任务改为10:30,根据上次last_run_at计算周期,下次是10:30执行,10:30减去11:00小于0,立即执行。

解决

  1. 手动删除celerybeat-schedule文件
  2. beat_init signal
1
2
3
4
5
6
7
8
from celery.signals import beat_init

@beat_init.connect
def remove_celerybeat_schedule(sender=None, **kwargs):
"""重启beat,重新生成celerybeat-schedule"""
if sender:
sender.scheduler._remove_db()
sender.scheduler.setup_schedule()

3.修改源码逻辑

参考

https://docs.python.org/zh-cn/3/library/shelve.html
http://www.pythondoc.com/celery-3.1.11/index.html
https://liqiang.io/post/celery-source-analysis-worker-start-flow

----------本文完,感谢您的阅读----------