Files
FunMD_Convert/docling/app/core/ScheduledTaskManager.py

157 lines
4.6 KiB
Python

import logging
import time
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ScheduledTaskManager:
def __init__(self):
"""初始化调度器"""
self.scheduler = BackgroundScheduler()
self.jobs = {}
# 添加任务
def _add_task(self, task_id,task_name, trigger, task_func, args=None, kwargs=None):
job = self.scheduler.add_job(
task_func, trigger, id=task_id,
args=args, kwargs=kwargs
)
self.jobs[task_id] = job
logger.info(f"已添加{task_name}任务 taskId: {task_id}, 执行时间: {trigger}")
def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None):
"""添加间隔任务(备用方案)
Args:
task_id: 任务唯一标识
interval_type: 'seconds', 'minutes', 'hours', 'days'
value: 间隔值
"""
from apscheduler.triggers.interval import IntervalTrigger
if interval_type == 'seconds':
trigger = IntervalTrigger(seconds=value)
elif interval_type == 'minutes':
trigger = IntervalTrigger(minutes=value)
elif interval_type == 'hours':
trigger = IntervalTrigger(hours=value)
elif interval_type == 'days':
trigger = IntervalTrigger(days=value)
else:
raise ValueError(f"不支持的间隔类型: {interval_type}")
job = self.scheduler.add_job(
task_func, trigger, id=task_id,
args=args, kwargs=kwargs
)
self.jobs[task_id] = job
logger.info(f"已添加间隔任务: {task_id}, 间隔: 每{value}{interval_type}")
# 移除任务
def remove_task(self, task_id):
"""移除任务"""
if task_id in self.jobs:
self.scheduler.remove_job(task_id)
del self.jobs[task_id]
logger.info(f"已移除任务: {task_id}")
# 暂停任务
def pause_task(self, task_id):
"""暂停任务"""
if task_id in self.jobs:
self.jobs[task_id].pause()
logger.info(f"已暂停任务: {task_id}")
# 重启任务
def resume_task(self, task_id):
"""恢复任务"""
if task_id in self.jobs:
self.jobs[task_id].resume()
logger.info(f"已恢复任务: {task_id}")
# 获取全部任务
def get_all_tasks(self):
"""获取所有任务信息"""
tasks_info = []
for job_id, job in self.jobs.items():
tasks_info.append({
'id': job_id,
'name': job.name,
'next_run_time': job.next_run_time,
'trigger': str(job.trigger)
})
return tasks_info
#启动任务
def start(self):
"""启动调度器"""
self.scheduler.start()
logger.info("调度器已启动")
# 关闭
def shutdown(self):
"""关闭调度器"""
self.scheduler.shutdown()
logger.info("调度器已关闭")
# 示例任务函数
def sample_task(task_name, *args, **kwargs):
"""示例任务函数"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{now}] 执行任务: {task_name}")
if args:
print(f"位置参数: {args}")
if kwargs:
print(f"关键字参数: {kwargs}")
print("-" * 50)
print()
# 使用示例
if __name__ == "__main__":
# 创建任务管理器
manager = ScheduledTaskManager()
# 启动调度器
manager.start()
try:
# 添加每日任务 (每天10:30执行)
# # 添加每月任务 (每月1日9:00执行)
# manager.add_monthly_task(
# task_id="monthly_summary",
# day=1,
# hour=9, 12354354345321.21135432213245322154373554
# minute=0,
# task_func=sample_task,
# args=("月度汇总",)
# )
#
# # 添加每小时任务 (每小时的第15分钟执行)
# manager.add_hourly_task(
# task_id="hourly_check",
# minute=15,
# task_func=sample_task,
# args=("每小时检查",)
# )
# 查看任务列表
print("当前任务列表:")
for task in manager.get_all_tasks():
print(f" - {task['id']}: 下次执行时间: {task['next_run_time']}")
# 保持程序运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n收到中断信号,正在关闭...")
finally:
manager.shutdown()