diff --git a/docling/app/core/ScheduledTask.py b/docling/app/core/ScheduledTask.py new file mode 100644 index 0000000..9519e05 --- /dev/null +++ b/docling/app/core/ScheduledTask.py @@ -0,0 +1,205 @@ +""" +定时任务基类 + +提供 crontab 表达式到 APScheduler CronTrigger 的转换功能 +以及便捷的任务添加方法 +""" + +import logging +from typing import Optional + +from app.core.ScheduledTaskManager import ScheduledTaskManager +from apscheduler.triggers.cron import CronTrigger + +logger = logging.getLogger(__name__) + + +class ScheduledTask: + """ + 定时任务基类 + + 提供将标准 crontab 表达式转换为 APScheduler CronTrigger 对象的功能 + + Crontab 表达式格式:分 时 日 月 周 + 例如: + - "0 2 * * *" -> 每天凌晨 2:00 执行 + - "*/5 * * * *" -> 每 5 分钟执行一次 + - "0 0 * * 0" -> 每周日 00:00 执行 + - "0 0 1 * *" -> 每月 1 日 00:00 执行 + """ + + def __init__(self, task_manager: Optional[ScheduledTaskManager] = None): + """ + 初始化定时任务 + + Args: + task_manager: 可选的调度任务管理器实例,如果不提供则需要外部提供 + """ + self.task_manager = task_manager + + def add_cron_task( + self, + task_id: str, + task_name: str, + crontab: str, + task_func, + args=None, + kwargs=None + ): + """ + 通过 crontab 表达式添加定时任务 + + Args: + task_id: 任务唯一标识符 + task_name: 任务名称 + crontab: crontab 表达式字符串,格式为 "分 时 日 月 周" + 例如: "0 2 * * *" 表示每天凌晨 2 点执行 + task_func: 要执行的任务函数 + args: 传递给任务函数的位置参数(元组) + kwargs: 传递给任务函数的关键字参数(字典) + + Returns: + bool: 添加成功返回 True,失败返回 False + + 示例: + task = ScheduledTask(task_manager) + task.add_cron_task( + task_id="daily_backup", + task_name="每日备份", + crontab="0 2 * * *", + task_func=backup_function, + args=() + ) + """ + if self.task_manager is None: + logger.error("任务管理器未初始化,无法添加任务") + return False + + try: + # 将 crontab 表达式转换为 CronTrigger + trigger = self.covert(crontab) + if trigger is None: + logger.error(f"crontab 表达式转换失败: {crontab}") + return False + + # 使用调度器添加任务 + job = self.task_manager.scheduler.add_job( + task_func, + trigger, + id=task_id, + name=task_name, + args=args, + kwargs=kwargs + ) + + # 记录到 jobs 字典中 + self.task_manager.jobs[task_id] = job + + logger.info(f"已添加定时任务: {task_name} ({task_id}), crontab: {crontab}") + return True + + except Exception as e: + logger.error(f"添加定时任务失败: {e}") + return False + + def covert(self, crontab: str) -> Optional[CronTrigger]: + """ + 将字符串的 crontab 表达式转换为 CronTrigger 对象 + + Args: + crontab: crontab 表达式字符串 + 格式: "分钟 小时 日期 月份 星期" + 支持的特殊字符: * (任意值), / (间隔), - (范围), , (列表) + + Returns: + Optional[CronTrigger]: CronTrigger 对象,解析失败返回 None + + Crontab 字段说明: + 字段 取值范围 特殊字符 + 分钟 0-59 * / - , + 小时 0-23 * / - , + 日期 1-31 * / - , + 月份 1-12 * / - , + 星期 0-7 (0和7都代表周日) * / - , + + 示例: + "0 2 * * *" -> 每天凌晨 2:00 + "*/5 * * * *" -> 每 5 分钟 + "0 0 * * 0" -> 每周日 00:00 + "0 0 1 * *" -> 每月 1 日 00:00 + "30 8 * * 1-5" -> 工作日早上 8:30 + "0 9-17 * * 1-5" -> 工作日每小时 9:00-17:00 + """ + if not crontab or not isinstance(crontab, str): + logger.error(f"无效的 crontab 表达式: {crontab}") + return None + + try: + # 去除首尾空格并按空格分割 + parts = crontab.strip().split() + + # 验证格式:crontab 表达式应该有 5 或 6 个部分 + # 5 部分: 分 时 日 月 周 + # 6 部分: 分 时 日 月 周 年 (可选) + if len(parts) < 5 or len(parts) > 6: + logger.error(f"crontab 表达式格式错误,应为 5 或 6 个部分,实际为 {len(parts)} 个: {crontab}") + return None + + # 解析各个字段 + minute = parts[0] # 分钟: 0-59 + hour = parts[1] # 小时: 0-23 + day = parts[2] # 日期: 1-31 + month = parts[3] # 月份: 1-12 + day_of_week = parts[4] # 星期: 0-7 (0 和 7 都代表周日) + + # 处理星期字段的特殊值 + # 在 crontab 中,0 和 7 都代表周日 + # 在 APScheduler 中,周日使用 6 (mon=0, tue=1, ..., sun=6) + # 需要进行转换 + if day_of_week in ['0', '7']: + day_of_week = '6' # 转换为 APScheduler 的周日表示 + + # 创建 CronTrigger 对象 + trigger = CronTrigger( + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week + ) + + logger.debug(f"成功转换 crontab 表达式: {crontab} -> {trigger}") + return trigger + + except ValueError as e: + logger.error(f"crontab 表达式值无效: {crontab}, 错误: {e}") + return None + except Exception as e: + logger.error(f"转换 crontab 表达式失败: {crontab}, 错误: {e}") + return None + + +# 使用示例 +if __name__ == "__main__": + # 创建任务实例 + task = ScheduledTask() + + # 测试各种 crontab 表达式 + test_cases = [ + "0 2 * * *", # 每天凌晨 2 点 + "*/5 * * * *", # 每 5 分钟 + "0 0 * * 0", # 每周日 + "0 0 1 * *", # 每月 1 日 + "30 8 * * 1-5", # 工作日早上 8:30 + "0 9-17 * * 1-5", # 工作日每小时 + ] + + print("Crontab 表达式转换测试:") + print("-" * 60) + + for crontab in test_cases: + trigger = task.covert(crontab) + if trigger: + print(f"✓ {crontab:20s} -> {trigger}") + else: + print(f"✗ {crontab:20s} -> 转换失败") diff --git a/docling/app/component/ScheduledTaskManager.py b/docling/app/core/ScheduledTaskManager.py similarity index 65% rename from docling/app/component/ScheduledTaskManager.py rename to docling/app/core/ScheduledTaskManager.py index 8bd6da8..c807b0e 100644 --- a/docling/app/component/ScheduledTaskManager.py +++ b/docling/app/core/ScheduledTaskManager.py @@ -1,8 +1,8 @@ -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.cron import CronTrigger -from datetime import datetime -import time import logging +import time +from datetime import datetime + +from apscheduler.schedulers.background import BackgroundScheduler # 配置日志 logging.basicConfig(level=logging.INFO) @@ -14,69 +14,16 @@ class ScheduledTaskManager: """初始化调度器""" self.scheduler = BackgroundScheduler() self.jobs = {} - # 添加“日”周期任务 - def add_second_task(self, task_id,second, task_func, args=None, kwargs=None): - trigger = CronTrigger(second=second) + # 添加任务 + def _add_task(self, task_id,task_name, trigger, task_func, args=None, kwargs=None): + job = self.scheduler.add_job( task_func, trigger, id=task_id, args=args, kwargs=kwargs ) self.jobs[task_id] = job - logger.info(f"已添加每秒任务: {task_id}, 执行时间: {second:02d}") - - # 添加“日”周期任务 - def add_daily_task(self, task_id, hour, minute, task_func, args=None, kwargs=None): - """添加每日定时任务 - - Args: - task_id: 任务唯一标识 - hour: 小时 (0-23) - minute: 分钟 (0-59) - task_func: 任务函数 - args: 位置参数 - kwargs: 关键字参数 - """ - trigger = CronTrigger(hour=hour, minute=minute) - job = self.scheduler.add_job( - task_func, trigger, id=task_id, - args=args, kwargs=kwargs - ) - self.jobs[task_id] = job - logger.info(f"已添加每日任务: {task_id}, 执行时间: {hour:02d}:{minute:02d}") - - # 添加“月”周期任务 - def add_monthly_task(self, task_id, day, hour, minute, task_func, args=None, kwargs=None): - """添加每月定时任务 - - Args: - task_id: 任务唯一标识 - day: 日 (1-31) - hour: 小时 (0-23) - minute: 分钟 (0-59) - """ - trigger = CronTrigger(day=day, hour=hour, minute=minute) - job = self.scheduler.add_job( - task_func, trigger, id=task_id, - args=args, kwargs=kwargs - ) - self.jobs[task_id] = job - logger.info(f"已添加每月任务: {task_id}, 执行时间: 每月{day}日 {hour:02d}:{minute:02d}") - - def add_hourly_task(self, task_id, minute, task_func, args=None, kwargs=None): - """添加每小时定时任务 - - Args: - task_id: 任务唯一标识 - minute: 分钟 (0-59) - """ - trigger = CronTrigger(minute=minute) - job = self.scheduler.add_job( - task_func, trigger, id=task_id, - args=args, kwargs=kwargs - ) - self.jobs[task_id] = job - logger.info(f"已添加每小时任务: {task_id}, 执行时间: 每小时的{minute:02d}分") + logger.info(f"已添加{task_name}任务 taskId: {task_id}, 执行时间: {trigger}") def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None): """添加间隔任务(备用方案) @@ -177,14 +124,6 @@ if __name__ == "__main__": try: # 添加每日任务 (每天10:30执行) - manager.add_second_task( - task_id="daily_report", - second=10, - task_func=sample_task, - args=("每日报告",), - kwargs={"type": "second"} - ) - # # # 添加每月任务 (每月1日9:00执行) # manager.add_monthly_task( # task_id="monthly_summary", diff --git a/docling/app/core/initializer.py b/docling/app/core/initializer.py deleted file mode 100644 index 9db3533..0000000 --- a/docling/app/core/initializer.py +++ /dev/null @@ -1,329 +0,0 @@ -""" -服务启动自动初始化模块 - -提供统一的服务启动初始化框架,支持: -1. 顺序初始化:按依赖顺序执行初始化任务 -2. 并发初始化:无依赖的任务可并发执行 -3. 失败重试:初始化失败可配置重试策略 -4. 健康检查:初始化完成后进行健康检查 -""" - -import asyncio -import logging -from abc import ABC, abstractmethod -from typing import List, Optional, Callable, Any, Dict -from dataclasses import dataclass, field -from enum import Enum -import traceback - -logger = logging.getLogger(__name__) - - -class InitStatus(Enum): - """初始化状态""" - PENDING = "pending" - RUNNING = "running" - SUCCESS = "success" - FAILED = "failed" - SKIPPED = "skipped" - - -@dataclass -class InitResult: - """初始化结果""" - name: str - status: InitStatus - message: str = "" - error: Optional[Exception] = None - duration_ms: float = 0.0 - - def __str__(self): - status_emoji = { - InitStatus.SUCCESS: "✓", - InitStatus.FAILED: "✗", - InitStatus.SKIPPED: "○", - InitStatus.RUNNING: "⟳", - }.get(self.status, "?") - return f"{status_emoji} {self.name}: {self.message}" - - -@dataclass -class InitTask: - """初始化任务配置""" - name: str - func: Callable - dependencies: List[str] = field(default_factory=list) - enabled: bool = True - retry_times: int = 0 - retry_delay: float = 1.0 - critical: bool = True # 失败是否阻止服务启动 - - def __post_init__(self): - if not self.name: - raise ValueError("Task name cannot be empty") - - -class AppInitializer: - """ - 应用初始化管理器 - - 使用示例: - initializer = AppInitializer() - - # 添加初始化任务 - @initializer.task("init_database") - async def init_database(): - # 数据库初始化逻辑 - pass - - @initializer.task("init_cache", dependencies=["init_database"]) - async def init_cache(): - # 缓存初始化逻辑(依赖数据库先初始化) - pass - - # 执行初始化 - results = await initializer.initialize() - """ - - def __init__(self, app_name: str = "FastAPI App"): - self.app_name = app_name - self.tasks: Dict[str, InitTask] = {} - self.results: List[InitResult] = [] - self._before_hooks: List[Callable] = [] - self._after_hooks: List[Callable] = [] - - def task( - self, - name: str, - dependencies: Optional[List[str]] = None, - enabled: bool = True, - retry_times: int = 0, - retry_delay: float = 1.0, - critical: bool = True - ) -> Callable: - """ - 装饰器:注册初始化任务 - - Args: - name: 任务名称 - dependencies: 依赖的任务名称列表 - enabled: 是否启用 - retry_times: 失败重试次数 - retry_delay: 重试延迟(秒) - critical: 是否为关键任务(失败则阻止启动) - """ - def decorator(func: Callable) -> Callable: - self.tasks[name] = InitTask( - name=name, - func=func, - dependencies=dependencies or [], - enabled=enabled, - retry_times=retry_times, - retry_delay=retry_delay, - critical=critical - ) - return func - return decorator - - def add_task( - self, - name: str, - func: Callable, - dependencies: Optional[List[str]] = None, - enabled: bool = True, - retry_times: int = 0, - retry_delay: float = 1.0, - critical: bool = True - ) -> None: - """手动添加初始化任务""" - self.tasks[name] = InitTask( - name=name, - func=func, - dependencies=dependencies or [], - enabled=enabled, - retry_times=retry_times, - retry_delay=retry_delay, - critical=critical - ) - - def before_start(self, func: Callable) -> Callable: - """装饰器:添加初始化前执行的钩子""" - self._before_hooks.append(func) - return func - - def after_start(self, func: Callable) -> Callable: - """装饰器:添加初始化后执行的钩子""" - self._after_hooks.append(func) - return func - - async def initialize(self) -> List[InitResult]: - """ - 执行所有初始化任务 - - Returns: - List[InitResult]: 初始化结果列表 - """ - import time - start_time = time.time() - - logger.info(f"{'='*60}") - logger.info(f"开始初始化 {self.app_name}") - logger.info(f"{'='*60}") - - # 执行前置钩子 - for hook in self._before_hooks: - try: - if asyncio.iscoroutinefunction(hook): - await hook() - else: - hook() - except Exception as e: - logger.error(f"前置钩子执行失败: {e}") - - self.results = [] - executed = set() - failed_critical = False - - # 按拓扑顺序执行任务 - while len(executed) < len(self.tasks): - # 找出所有可以执行的任务(依赖已满足或未启用) - ready_tasks = [ - task for name, task in self.tasks.items() - if name not in executed and task.enabled and - all(dep in executed or dep not in self.tasks for dep in task.dependencies) - ] - - if not ready_tasks: - # 检查是否有未执行的任务 - remaining = [name for name in self.tasks if name not in executed and self.tasks[name].enabled] - if remaining: - logger.error(f"存在循环依赖或未满足的依赖: {remaining}") - break - - # 并发执行所有就绪的任务 - results = await asyncio.gather( - *[self._execute_task(task) for task in ready_tasks], - return_exceptions=True - ) - - for result in results: - if isinstance(result, Exception): - logger.error(f"任务执行异常: {result}") - continue - - self.results.append(result) - executed.add(result.name) - - if result.status == InitStatus.FAILED and self.tasks[result.name].critical: - failed_critical = True - logger.error(f"关键任务 {result.name} 初始化失败,停止后续初始化") - - # 执行后置钩子 - if not failed_critical: - for hook in self._after_hooks: - try: - if asyncio.iscoroutinefunction(hook): - await hook() - else: - hook() - except Exception as e: - logger.error(f"后置钩子执行失败: {e}") - - # 打印摘要 - duration = time.time() - start_time - self._print_summary(duration, failed_critical) - - return self.results - - async def _execute_task(self, task: InitTask) -> InitResult: - """执行单个初始化任务""" - import time - start_time = time.time() - - result = InitResult(name=task.name, status=InitStatus.PENDING) - - for attempt in range(task.retry_times + 1): - try: - result.status = InitStatus.RUNNING - logger.info(f"正在执行: {task.name}" + (f" (重试 {attempt}/{task.retry_times})" if attempt > 0 else "")) - - # 执行任务 - if asyncio.iscoroutinefunction(task.func): - await task.func() - else: - task.func() - - result.status = InitStatus.SUCCESS - result.message = "初始化成功" - break - - except Exception as e: - if attempt < task.retry_times: - logger.warning(f"{task.name} 失败,{task.retry_delay}秒后重试: {e}") - await asyncio.sleep(task.retry_delay) - else: - result.status = InitStatus.FAILED - result.message = f"初始化失败: {str(e)}" - result.error = e - logger.error(f"{task.name} 失败: {e}\n{traceback.format_exc()}") - - result.duration_ms = (time.time() - start_time) * 1000 - return result - - def _print_summary(self, duration: float, failed_critical: bool) -> None: - """打印初始化摘要""" - logger.info(f"{'='*60}") - logger.info(f"初始化完成 (耗时: {duration:.2f}秒)") - logger.info(f"{'='*60}") - - # 统计 - success_count = sum(1 for r in self.results if r.status == InitStatus.SUCCESS) - failed_count = sum(1 for r in self.results if r.status == InitStatus.FAILED) - skipped_count = sum(1 for r in self.results if r.status == InitStatus.SKIPPED) - - logger.info(f"总计: {len(self.results)} 个任务") - logger.info(f" 成功: {success_count}") - logger.info(f" 失败: {failed_count}") - logger.info(f" 跳过: {skipped_count}") - - # 详细结果 - if self.results: - logger.info(f"\n详细结果:") - for result in self.results: - logger.info(f" {result}") - - if failed_critical: - logger.error(f"{'='*60}") - logger.error(f"关键任务初始化失败,服务可能无法正常工作!") - logger.error(f"{'='*60}") - - def get_results(self) -> List[InitResult]: - """获取初始化结果""" - return self.results - - def is_successful(self) -> bool: - """检查是否所有关键任务都初始化成功""" - for result in self.results: - if result.status == InitStatus.FAILED: - task = self.tasks.get(result.name) - if task and task.critical: - return False - return True - - -# 全局初始化器实例 -_global_initializer: Optional[AppInitializer] = None - - -def get_initializer() -> AppInitializer: - """获取全局初始化器实例""" - global _global_initializer - if _global_initializer is None: - _global_initializer = AppInitializer() - return _global_initializer - - -def set_initializer(initializer: AppInitializer) -> None: - """设置全局初始化器实例""" - global _global_initializer - _global_initializer = initializer diff --git a/docling/app/server.py b/docling/app/server.py index 368e23d..3ce5d4e 100644 --- a/docling/app/server.py +++ b/docling/app/server.py @@ -64,6 +64,12 @@ from app.services.pdf_converter import ( markdown_file_to_pdf_bytes, read_file_content, ) +from app.core.ScheduledTaskManager import ScheduledTaskManager +from app.core.ScheduledTask import ScheduledTask + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) """ @api Server Application @@ -78,6 +84,122 @@ app.add_middleware( allow_headers=["*"], ) + +# ==================== 调度任务管理器 ==================== +# 全局调度任务管理器实例,用于管理所有定时任务 +# 使用 Optional 类型注解,表示可能为 None(服务启动前/关闭后) +_task_manager: Optional[ScheduledTaskManager] = None + + +def get_task_manager() -> Optional[ScheduledTaskManager]: + """ + 获取全局调度任务管理器实例 + + Returns: + Optional[ScheduledTaskManager]: 调度器实例,如果未初始化则返回 None + + 使用示例: + task_manager = get_task_manager() + if task_manager: + task_manager.add_daily_task("task_id", 10, 30, some_function) + """ + global _task_manager + return _task_manager + + +# 定义定时任务函数 +def scheduled_method(): + """定时任务执行的函数""" + logger.info("执行定时任务...") + + +@app.on_event("startup") +async def startup_event(): + """ + FastAPI 服务启动事件处理函数 + + 在服务启动时自动执行,完成以下初始化工作: + 1. 创建 ScheduledTaskManager 实例 + 2. 启动后台调度器(基于 APScheduler BackgroundScheduler) + 3. 添加定时任务 + 4. 记录初始化日志 + + 注意:此函数在 FastAPI 接收任何请求之前执行 + """ + global _task_manager + + logger.info("=" * 60) + logger.info("服务启动初始化...") + logger.info("=" * 60) + + try: + # 步骤1: 创建调度任务管理器实例 + _task_manager = ScheduledTaskManager() + logger.info("调度任务管理器已初始化") + + # 步骤2: 启动调度器 + _task_manager.start() + logger.info("任务调度器已启动") + + # 步骤3: 创建 ScheduledTask 实例并添加定时任务 + scheduled_task = ScheduledTask(task_manager=_task_manager) + scheduled_task.add_cron_task( + task_id="main-task", + task_name="调度任务执行启动", + crontab="*/1 * * * *", # 每分钟执行一次 + task_func=scheduled_method, # 注意:不要加括号 + args=(), # 位置参数(元组) + kwargs={} # 关键字参数(字典) + ) + logger.info("已添加定时任务: main-task (每分钟执行一次)") + + logger.info("=" * 60) + logger.info("服务启动初始化完成") + logger.info("=" * 60) + + except Exception as e: + logger.error(f"服务启动初始化失败: {e}") + # 重新抛出异常,让 FastAPI 知道启动失败 + raise + +@app.on_event("shutdown") +async def shutdown_event(): + """ + FastAPI 服务关闭事件处理函数 + + 在服务关闭时自动执行,完成以下清理工作: + 1. 优雅关闭调度器(等待正在执行的任务完成) + 2. 清理全局变量引用 + + 注意:此函数在 FastAPI 停止接收新请求后执行 + """ + global _task_manager + + logger.info("=" * 60) + logger.info("服务关闭中...") + logger.info("=" * 60) + + try: + # 检查调度器是否已初始化 + if _task_manager is not None: + # 关闭调度器 + # shutdown() 会等待正在执行的任务完成,然后终止后台线程 + _task_manager.shutdown() + logger.info("任务调度器已关闭") + + # 清理全局变量引用,释放内存 + _task_manager = None + + logger.info("=" * 60) + logger.info("服务已关闭") + logger.info("=" * 60) + + except Exception as e: + # 记录错误但不重新抛出,确保服务能正常关闭 + logger.error(f"服务关闭时发生错误: {e}") + + +# ==================== 静态文件挂载 ==================== try: _ui_dir = Path(__file__).resolve().parents[2] / "frontend" / "dist" if _ui_dir.exists():