import json
import pytz
from django_celery_beat.models import PeriodicTask, PeriodicTasks, CrontabSchedule
from phone.models import PhoneInfo
def task_add(task_name, task_class, task_queue, interval=None, crontab=None, args=None, kwargs=None):
"""
添加celery任务
:param kwargs: 默认'{}'
:param args: 默认'[]'
:param task_name: 任务名
:param task_class: 任务代码
:param task_queue: 任务管道
:param interval: 定时类型
:param crontab: 定时类型
:return: 任务id
"""
if args is None:
args = []
if kwargs is None:
kwargs = {}
task = PeriodicTask.objects.create(name=task_name, task=task_class, interval=interval, crontab=crontab)
task.queue = task_queue
task.exchange = task_queue
task.routing_key = 'default'
task.enabled = True
task.args = json.dumps(args)
task.kwargs = json.dumps(kwargs)
task.save()
PeriodicTasks.changed(task)
return task.id
def task_update(task_id, crontab):
"""
修改任务
:param task_id: 需要修改的任务的id
:param crontab: 定时类型
:return:
"""
per_task = PeriodicTask.objects.get(id=task_id)
per_task.crontab = crontab
per_task.save()
PeriodicTasks.changed(per_task)
def task_delete(task_id):
"""
删除任务
:param task_id:
:return:
"""
try:
task_query = PeriodicTask.objects.get(id=task_id)
except PeriodicTask.DoesNotExist:
print(f'{task_id}ID 不存在')
else:
task_query.enabled = False
task_query.save()
# 删除任务
task_query.delete()
return
def task_list(task_name, task_queue):
"""
任务查询
:param task_name: 任务名称
:param task_queue: 任务队列
:return: task_name任务名称、task_queue任务队列、task_args任务参数、task_class任务执行类、task_cron任务定时的表达式
"""
# 查询目前满足条件的所有周期性任务
per_task = PeriodicTask.objects.get(name=task_name, queue=task_queue)
data = {
"task_name": per_task.name,
"task_queue": per_task.queue,
"task_kwargs": per_task.kwargs,
"task_class": per_task.task,
"task_cron": per_task.crontab,
}
return data
def queue_update(queue_name_pre, queue_name_cur):
"""
更改任务的队列
:param queue_name_pre: 要改的队列名称
:param queue_name_cur: 改变后的队列名
:return:
"""
all_tasks = PeriodicTask.objects.filter(queue=queue_name_pre)
all_tasks_ids = [per_task.id for per_task in all_tasks]
for task_id in all_tasks_ids:
task_query = PeriodicTask.objects.get(id=task_id)
task_query.queue = queue_name_cur
task_query.exchange = queue_name_cur
task_query.routing_key = queue_name_cur
task_query.save()
PeriodicTasks.update_changed()
all_tasks = PeriodicTask.objects.filter(queue=queue_name_cur)
return all_tasks
def get_cron_obj(plan_time):
"""获取定时时间类型"""
cron_obj, flag = CrontabSchedule.objects.get_or_create(minute=plan_time.minute,
hour=plan_time.hour,
day_of_week='*',
day_of_month=plan_time.day,
month_of_year=plan_time.month,
timezone=pytz.timezone('Asia/Shanghai'))
return cron_obj
class PlanTask(object):
def __init__(self, model, instance):
if model == "model0":
# task_name, task_class, task_queue, interval = None, crontab = None, args = None, kwargs = None
self.data = {"task_name": f'model0-计划{instance.pk}',
"task_class": "model0.tasks.model_0",
}
elif model == "model1":
# task_name, task_class, task_queue, interval = None, crontab = None, args = None, kwargs = None
self.data = {"task_name": f'model1-计划{instance.pk}',
"task_class": "model1.tasks.model_1",
}
else:
pass
self.data.update({"task_queue": "manager", "interval": None, "crontab": self.get_cron_obj(instance.plan_time),
"args": [instance.pk]})
self.instance = instance
@staticmethod
def get_cron_obj(plan_time):
"""获取定时时间类型"""
cron_obj, flag = CrontabSchedule.objects.get_or_create(minute=plan_time.minute,
hour=plan_time.hour,
day_of_week='*',
day_of_month=plan_time.day,
month_of_year=plan_time.month,
timezone=pytz.timezone('Asia/Shanghai'))
return cron_obj
def implement_task_add(self):
"""添加Celery计划"""
task_id = task_add(**self.data)
self.instance.task_id = task_id
self.instance.save(update_fields=['task_id'])
def implement_task_update(self, validated_data):
"""更新Celery计划"""
if self.instance.is_enable: # 已启用
if self.instance.task_id is None: # 未创建celery计划任务,创建任务
self.implement_task_add()
if self.instance.task_id and 'plan_time' in validated_data.keys(): # 已创建celery计划任务,需变更时间
crontab = get_cron_obj(self.instance.plan_time)
# 更新任务时间
task_update(self.instance.task_id, crontab)
else: # 未启用
if self.instance.task_id: # 已存在计划任务,需删除计划任务
task_delete(self.instance.task_id)
self.instance.task_id = None
self.instance.save(update_fields=['task_id'])
if __name__ == '__main__':
instance = PhoneInfo.objects.first()
validated_data = {"plan_time": "2020-02-25T16:23:26.091538+08:00"}
# 添加计划
PlanTask(model='model1', instance=instance).implement_task_add()
# 修改计划
PlanTask(model='model1', instance=instance).implement_task_update(validated_data)
# 删除计划
task_delete(instance.task_id)