diff --git a/docling/app/component/ScheduledTaskManager.py b/docling/app/component/ScheduledTaskManager.py new file mode 100644 index 0000000..8bd6da8 --- /dev/null +++ b/docling/app/component/ScheduledTaskManager.py @@ -0,0 +1,218 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from datetime import datetime +import time +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ScheduledTaskManager: + def __init__(self): + """初始化调度器""" + self.scheduler = BackgroundScheduler() + self.jobs = {} + # 添加“日”周期任务 + def add_second_task(self, task_id,second, task_func, args=None, kwargs=None): + + trigger = CronTrigger(second=second) + job = self.scheduler.add_job( + task_func, trigger, id=task_id, + args=args, kwargs=kwargs + ) + self.jobs[task_id] = job + logger.info(f"已添加每秒任务: {task_id}, 执行时间: {second:02d}") + + # 添加“日”周期任务 + def add_daily_task(self, task_id, hour, minute, task_func, args=None, kwargs=None): + """添加每日定时任务 + + Args: + task_id: 任务唯一标识 + hour: 小时 (0-23) + minute: 分钟 (0-59) + task_func: 任务函数 + args: 位置参数 + kwargs: 关键字参数 + """ + trigger = CronTrigger(hour=hour, minute=minute) + job = self.scheduler.add_job( + task_func, trigger, id=task_id, + args=args, kwargs=kwargs + ) + self.jobs[task_id] = job + logger.info(f"已添加每日任务: {task_id}, 执行时间: {hour:02d}:{minute:02d}") + + # 添加“月”周期任务 + def add_monthly_task(self, task_id, day, hour, minute, task_func, args=None, kwargs=None): + """添加每月定时任务 + + Args: + task_id: 任务唯一标识 + day: 日 (1-31) + hour: 小时 (0-23) + minute: 分钟 (0-59) + """ + trigger = CronTrigger(day=day, hour=hour, minute=minute) + job = self.scheduler.add_job( + task_func, trigger, id=task_id, + args=args, kwargs=kwargs + ) + self.jobs[task_id] = job + logger.info(f"已添加每月任务: {task_id}, 执行时间: 每月{day}日 {hour:02d}:{minute:02d}") + + def add_hourly_task(self, task_id, minute, task_func, args=None, kwargs=None): + """添加每小时定时任务 + + Args: + task_id: 任务唯一标识 + minute: 分钟 (0-59) + """ + trigger = CronTrigger(minute=minute) + job = self.scheduler.add_job( + task_func, trigger, id=task_id, + args=args, kwargs=kwargs + ) + self.jobs[task_id] = job + logger.info(f"已添加每小时任务: {task_id}, 执行时间: 每小时的{minute:02d}分") + + def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None): + """添加间隔任务(备用方案) + + Args: + task_id: 任务唯一标识 + interval_type: 'seconds', 'minutes', 'hours', 'days' + value: 间隔值 + """ + from apscheduler.triggers.interval import IntervalTrigger + + if interval_type == 'seconds': + trigger = IntervalTrigger(seconds=value) + elif interval_type == 'minutes': + trigger = IntervalTrigger(minutes=value) + elif interval_type == 'hours': + trigger = IntervalTrigger(hours=value) + elif interval_type == 'days': + trigger = IntervalTrigger(days=value) + else: + raise ValueError(f"不支持的间隔类型: {interval_type}") + + job = self.scheduler.add_job( + task_func, trigger, id=task_id, + args=args, kwargs=kwargs + ) + self.jobs[task_id] = job + logger.info(f"已添加间隔任务: {task_id}, 间隔: 每{value}{interval_type}") + + # 移除任务 + def remove_task(self, task_id): + """移除任务""" + if task_id in self.jobs: + self.scheduler.remove_job(task_id) + del self.jobs[task_id] + logger.info(f"已移除任务: {task_id}") + + # 暂停任务 + def pause_task(self, task_id): + """暂停任务""" + if task_id in self.jobs: + self.jobs[task_id].pause() + logger.info(f"已暂停任务: {task_id}") + + # 重启任务 + def resume_task(self, task_id): + """恢复任务""" + if task_id in self.jobs: + self.jobs[task_id].resume() + logger.info(f"已恢复任务: {task_id}") + + # 获取全部任务 + def get_all_tasks(self): + """获取所有任务信息""" + tasks_info = [] + for job_id, job in self.jobs.items(): + tasks_info.append({ + 'id': job_id, + 'name': job.name, + 'next_run_time': job.next_run_time, + 'trigger': str(job.trigger) + }) + return tasks_info + + #启动任务 + def start(self): + """启动调度器""" + self.scheduler.start() + logger.info("调度器已启动") + + # 关闭 + def shutdown(self): + """关闭调度器""" + self.scheduler.shutdown() + logger.info("调度器已关闭") + + +# 示例任务函数 +def sample_task(task_name, *args, **kwargs): + """示例任务函数""" + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{now}] 执行任务: {task_name}") + if args: + print(f"位置参数: {args}") + if kwargs: + print(f"关键字参数: {kwargs}") + print("-" * 50) + print() + + +# 使用示例 +if __name__ == "__main__": + # 创建任务管理器 + manager = ScheduledTaskManager() + + # 启动调度器 + manager.start() + + try: + # 添加每日任务 (每天10:30执行) + manager.add_second_task( + task_id="daily_report", + second=10, + task_func=sample_task, + args=("每日报告",), + kwargs={"type": "second"} + ) + # + # # 添加每月任务 (每月1日9:00执行) + # manager.add_monthly_task( + # task_id="monthly_summary", + # day=1, + # hour=9, 12354354345321.21135432213245322154373554 + # minute=0, + # task_func=sample_task, + # args=("月度汇总",) + # ) + # + # # 添加每小时任务 (每小时的第15分钟执行) + # manager.add_hourly_task( + # task_id="hourly_check", + # minute=15, + # task_func=sample_task, + # args=("每小时检查",) + # ) + + # 查看任务列表 + print("当前任务列表:") + for task in manager.get_all_tasks(): + print(f" - {task['id']}: 下次执行时间: {task['next_run_time']}") + + # 保持程序运行 + while True: + time.sleep(1) + + except KeyboardInterrupt: + print("\n收到中断信号,正在关闭...") + finally: + manager.shutdown() \ No newline at end of file diff --git a/docling/requirements.txt b/docling/requirements.txt index 63a7484..3014c5c 100644 --- a/docling/requirements.txt +++ b/docling/requirements.txt @@ -26,3 +26,4 @@ safetensors scipy opencv-python pymupdf +apscheduler