Compare commits
3 Commits
219576fa7c
...
v1.0.0
| Author | SHA1 | Date | |
|---|---|---|---|
| f16ca4fbdc | |||
| ae9f7a681f | |||
| 8083074874 |
205
docling/app/core/ScheduledTask.py
Normal file
205
docling/app/core/ScheduledTask.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""
|
||||
定时任务基类
|
||||
|
||||
提供 crontab 表达式到 APScheduler CronTrigger 的转换功能
|
||||
以及便捷的任务添加方法
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from app.core.ScheduledTaskManager import ScheduledTaskManager
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ScheduledTask:
|
||||
"""
|
||||
定时任务基类
|
||||
|
||||
提供将标准 crontab 表达式转换为 APScheduler CronTrigger 对象的功能
|
||||
|
||||
Crontab 表达式格式:分 时 日 月 周
|
||||
例如:
|
||||
- "0 2 * * *" -> 每天凌晨 2:00 执行
|
||||
- "*/5 * * * *" -> 每 5 分钟执行一次
|
||||
- "0 0 * * 0" -> 每周日 00:00 执行
|
||||
- "0 0 1 * *" -> 每月 1 日 00:00 执行
|
||||
"""
|
||||
|
||||
def __init__(self, task_manager: Optional[ScheduledTaskManager] = None):
|
||||
"""
|
||||
初始化定时任务
|
||||
|
||||
Args:
|
||||
task_manager: 可选的调度任务管理器实例,如果不提供则需要外部提供
|
||||
"""
|
||||
self.task_manager = task_manager
|
||||
|
||||
def add_cron_task(
|
||||
self,
|
||||
task_id: str,
|
||||
task_name: str,
|
||||
crontab: str,
|
||||
task_func,
|
||||
args=None,
|
||||
kwargs=None
|
||||
):
|
||||
"""
|
||||
通过 crontab 表达式添加定时任务
|
||||
|
||||
Args:
|
||||
task_id: 任务唯一标识符
|
||||
task_name: 任务名称
|
||||
crontab: crontab 表达式字符串,格式为 "分 时 日 月 周"
|
||||
例如: "0 2 * * *" 表示每天凌晨 2 点执行
|
||||
task_func: 要执行的任务函数
|
||||
args: 传递给任务函数的位置参数(元组)
|
||||
kwargs: 传递给任务函数的关键字参数(字典)
|
||||
|
||||
Returns:
|
||||
bool: 添加成功返回 True,失败返回 False
|
||||
|
||||
示例:
|
||||
task = ScheduledTask(task_manager)
|
||||
task.add_cron_task(
|
||||
task_id="daily_backup",
|
||||
task_name="每日备份",
|
||||
crontab="0 2 * * *",
|
||||
task_func=backup_function,
|
||||
args=()
|
||||
)
|
||||
"""
|
||||
if self.task_manager is None:
|
||||
logger.error("任务管理器未初始化,无法添加任务")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 将 crontab 表达式转换为 CronTrigger
|
||||
trigger = self.covert(crontab)
|
||||
if trigger is None:
|
||||
logger.error(f"crontab 表达式转换失败: {crontab}")
|
||||
return False
|
||||
|
||||
# 使用调度器添加任务
|
||||
job = self.task_manager.scheduler.add_job(
|
||||
task_func,
|
||||
trigger,
|
||||
id=task_id,
|
||||
name=task_name,
|
||||
args=args,
|
||||
kwargs=kwargs
|
||||
)
|
||||
|
||||
# 记录到 jobs 字典中
|
||||
self.task_manager.jobs[task_id] = job
|
||||
|
||||
logger.info(f"已添加定时任务: {task_name} ({task_id}), crontab: {crontab}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"添加定时任务失败: {e}")
|
||||
return False
|
||||
|
||||
def covert(self, crontab: str) -> Optional[CronTrigger]:
|
||||
"""
|
||||
将字符串的 crontab 表达式转换为 CronTrigger 对象
|
||||
|
||||
Args:
|
||||
crontab: crontab 表达式字符串
|
||||
格式: "分钟 小时 日期 月份 星期"
|
||||
支持的特殊字符: * (任意值), / (间隔), - (范围), , (列表)
|
||||
|
||||
Returns:
|
||||
Optional[CronTrigger]: CronTrigger 对象,解析失败返回 None
|
||||
|
||||
Crontab 字段说明:
|
||||
字段 取值范围 特殊字符
|
||||
分钟 0-59 * / - ,
|
||||
小时 0-23 * / - ,
|
||||
日期 1-31 * / - ,
|
||||
月份 1-12 * / - ,
|
||||
星期 0-7 (0和7都代表周日) * / - ,
|
||||
|
||||
示例:
|
||||
"0 2 * * *" -> 每天凌晨 2:00
|
||||
"*/5 * * * *" -> 每 5 分钟
|
||||
"0 0 * * 0" -> 每周日 00:00
|
||||
"0 0 1 * *" -> 每月 1 日 00:00
|
||||
"30 8 * * 1-5" -> 工作日早上 8:30
|
||||
"0 9-17 * * 1-5" -> 工作日每小时 9:00-17:00
|
||||
"""
|
||||
if not crontab or not isinstance(crontab, str):
|
||||
logger.error(f"无效的 crontab 表达式: {crontab}")
|
||||
return None
|
||||
|
||||
try:
|
||||
# 去除首尾空格并按空格分割
|
||||
parts = crontab.strip().split()
|
||||
|
||||
# 验证格式:crontab 表达式应该有 5 或 6 个部分
|
||||
# 5 部分: 分 时 日 月 周
|
||||
# 6 部分: 分 时 日 月 周 年 (可选)
|
||||
if len(parts) < 5 or len(parts) > 6:
|
||||
logger.error(f"crontab 表达式格式错误,应为 5 或 6 个部分,实际为 {len(parts)} 个: {crontab}")
|
||||
return None
|
||||
|
||||
# 解析各个字段
|
||||
minute = parts[0] # 分钟: 0-59
|
||||
hour = parts[1] # 小时: 0-23
|
||||
day = parts[2] # 日期: 1-31
|
||||
month = parts[3] # 月份: 1-12
|
||||
day_of_week = parts[4] # 星期: 0-7 (0 和 7 都代表周日)
|
||||
|
||||
# 处理星期字段的特殊值
|
||||
# 在 crontab 中,0 和 7 都代表周日
|
||||
# 在 APScheduler 中,周日使用 6 (mon=0, tue=1, ..., sun=6)
|
||||
# 需要进行转换
|
||||
if day_of_week in ['0', '7']:
|
||||
day_of_week = '6' # 转换为 APScheduler 的周日表示
|
||||
|
||||
# 创建 CronTrigger 对象
|
||||
trigger = CronTrigger(
|
||||
minute=minute,
|
||||
hour=hour,
|
||||
day=day,
|
||||
month=month,
|
||||
day_of_week=day_of_week
|
||||
)
|
||||
|
||||
logger.debug(f"成功转换 crontab 表达式: {crontab} -> {trigger}")
|
||||
return trigger
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"crontab 表达式值无效: {crontab}, 错误: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"转换 crontab 表达式失败: {crontab}, 错误: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 创建任务实例
|
||||
task = ScheduledTask()
|
||||
|
||||
# 测试各种 crontab 表达式
|
||||
test_cases = [
|
||||
"0 2 * * *", # 每天凌晨 2 点
|
||||
"*/5 * * * *", # 每 5 分钟
|
||||
"0 0 * * 0", # 每周日
|
||||
"0 0 1 * *", # 每月 1 日
|
||||
"30 8 * * 1-5", # 工作日早上 8:30
|
||||
"0 9-17 * * 1-5", # 工作日每小时
|
||||
]
|
||||
|
||||
print("Crontab 表达式转换测试:")
|
||||
print("-" * 60)
|
||||
|
||||
for crontab in test_cases:
|
||||
trigger = task.covert(crontab)
|
||||
if trigger:
|
||||
print(f"✓ {crontab:20s} -> {trigger}")
|
||||
else:
|
||||
print(f"✗ {crontab:20s} -> 转换失败")
|
||||
157
docling/app/core/ScheduledTaskManager.py
Normal file
157
docling/app/core/ScheduledTaskManager.py
Normal file
@@ -0,0 +1,157 @@
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ScheduledTaskManager:
|
||||
def __init__(self):
|
||||
"""初始化调度器"""
|
||||
self.scheduler = BackgroundScheduler()
|
||||
self.jobs = {}
|
||||
|
||||
# 添加任务
|
||||
def _add_task(self, task_id,task_name, trigger, task_func, args=None, kwargs=None):
|
||||
|
||||
job = self.scheduler.add_job(
|
||||
task_func, trigger, id=task_id,
|
||||
args=args, kwargs=kwargs
|
||||
)
|
||||
self.jobs[task_id] = job
|
||||
logger.info(f"已添加{task_name}任务 taskId: {task_id}, 执行时间: {trigger}")
|
||||
|
||||
def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None):
|
||||
"""添加间隔任务(备用方案)
|
||||
|
||||
Args:
|
||||
task_id: 任务唯一标识
|
||||
interval_type: 'seconds', 'minutes', 'hours', 'days'
|
||||
value: 间隔值
|
||||
"""
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
|
||||
if interval_type == 'seconds':
|
||||
trigger = IntervalTrigger(seconds=value)
|
||||
elif interval_type == 'minutes':
|
||||
trigger = IntervalTrigger(minutes=value)
|
||||
elif interval_type == 'hours':
|
||||
trigger = IntervalTrigger(hours=value)
|
||||
elif interval_type == 'days':
|
||||
trigger = IntervalTrigger(days=value)
|
||||
else:
|
||||
raise ValueError(f"不支持的间隔类型: {interval_type}")
|
||||
|
||||
job = self.scheduler.add_job(
|
||||
task_func, trigger, id=task_id,
|
||||
args=args, kwargs=kwargs
|
||||
)
|
||||
self.jobs[task_id] = job
|
||||
logger.info(f"已添加间隔任务: {task_id}, 间隔: 每{value}{interval_type}")
|
||||
|
||||
# 移除任务
|
||||
def remove_task(self, task_id):
|
||||
"""移除任务"""
|
||||
if task_id in self.jobs:
|
||||
self.scheduler.remove_job(task_id)
|
||||
del self.jobs[task_id]
|
||||
logger.info(f"已移除任务: {task_id}")
|
||||
|
||||
# 暂停任务
|
||||
def pause_task(self, task_id):
|
||||
"""暂停任务"""
|
||||
if task_id in self.jobs:
|
||||
self.jobs[task_id].pause()
|
||||
logger.info(f"已暂停任务: {task_id}")
|
||||
|
||||
# 重启任务
|
||||
def resume_task(self, task_id):
|
||||
"""恢复任务"""
|
||||
if task_id in self.jobs:
|
||||
self.jobs[task_id].resume()
|
||||
logger.info(f"已恢复任务: {task_id}")
|
||||
|
||||
# 获取全部任务
|
||||
def get_all_tasks(self):
|
||||
"""获取所有任务信息"""
|
||||
tasks_info = []
|
||||
for job_id, job in self.jobs.items():
|
||||
tasks_info.append({
|
||||
'id': job_id,
|
||||
'name': job.name,
|
||||
'next_run_time': job.next_run_time,
|
||||
'trigger': str(job.trigger)
|
||||
})
|
||||
return tasks_info
|
||||
|
||||
#启动任务
|
||||
def start(self):
|
||||
"""启动调度器"""
|
||||
self.scheduler.start()
|
||||
logger.info("调度器已启动")
|
||||
|
||||
# 关闭
|
||||
def shutdown(self):
|
||||
"""关闭调度器"""
|
||||
self.scheduler.shutdown()
|
||||
logger.info("调度器已关闭")
|
||||
|
||||
|
||||
# 示例任务函数
|
||||
def sample_task(task_name, *args, **kwargs):
|
||||
"""示例任务函数"""
|
||||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{now}] 执行任务: {task_name}")
|
||||
if args:
|
||||
print(f"位置参数: {args}")
|
||||
if kwargs:
|
||||
print(f"关键字参数: {kwargs}")
|
||||
print("-" * 50)
|
||||
print()
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 创建任务管理器
|
||||
manager = ScheduledTaskManager()
|
||||
|
||||
# 启动调度器
|
||||
manager.start()
|
||||
|
||||
try:
|
||||
# 添加每日任务 (每天10:30执行)
|
||||
# # 添加每月任务 (每月1日9:00执行)
|
||||
# manager.add_monthly_task(
|
||||
# task_id="monthly_summary",
|
||||
# day=1,
|
||||
# hour=9, 12354354345321.21135432213245322154373554
|
||||
# minute=0,
|
||||
# task_func=sample_task,
|
||||
# args=("月度汇总",)
|
||||
# )
|
||||
#
|
||||
# # 添加每小时任务 (每小时的第15分钟执行)
|
||||
# manager.add_hourly_task(
|
||||
# task_id="hourly_check",
|
||||
# minute=15,
|
||||
# task_func=sample_task,
|
||||
# args=("每小时检查",)
|
||||
# )
|
||||
|
||||
# 查看任务列表
|
||||
print("当前任务列表:")
|
||||
for task in manager.get_all_tasks():
|
||||
print(f" - {task['id']}: 下次执行时间: {task['next_run_time']}")
|
||||
|
||||
# 保持程序运行
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n收到中断信号,正在关闭...")
|
||||
finally:
|
||||
manager.shutdown()
|
||||
1
docling/app/core/__init__.py
Normal file
1
docling/app/core/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Core module for application initialization and configuration
|
||||
@@ -64,6 +64,12 @@ from app.services.pdf_converter import (
|
||||
markdown_file_to_pdf_bytes,
|
||||
read_file_content,
|
||||
)
|
||||
from app.core.ScheduledTaskManager import ScheduledTaskManager
|
||||
from app.core.ScheduledTask import ScheduledTask
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
"""
|
||||
@api Server Application
|
||||
@@ -78,6 +84,122 @@ app.add_middleware(
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
# ==================== 调度任务管理器 ====================
|
||||
# 全局调度任务管理器实例,用于管理所有定时任务
|
||||
# 使用 Optional 类型注解,表示可能为 None(服务启动前/关闭后)
|
||||
_task_manager: Optional[ScheduledTaskManager] = None
|
||||
|
||||
|
||||
def get_task_manager() -> Optional[ScheduledTaskManager]:
|
||||
"""
|
||||
获取全局调度任务管理器实例
|
||||
|
||||
Returns:
|
||||
Optional[ScheduledTaskManager]: 调度器实例,如果未初始化则返回 None
|
||||
|
||||
使用示例:
|
||||
task_manager = get_task_manager()
|
||||
if task_manager:
|
||||
task_manager.add_daily_task("task_id", 10, 30, some_function)
|
||||
"""
|
||||
global _task_manager
|
||||
return _task_manager
|
||||
|
||||
|
||||
# 定义定时任务函数
|
||||
def scheduled_method():
|
||||
"""定时任务执行的函数"""
|
||||
logger.info("执行定时任务...")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""
|
||||
FastAPI 服务启动事件处理函数
|
||||
|
||||
在服务启动时自动执行,完成以下初始化工作:
|
||||
1. 创建 ScheduledTaskManager 实例
|
||||
2. 启动后台调度器(基于 APScheduler BackgroundScheduler)
|
||||
3. 添加定时任务
|
||||
4. 记录初始化日志
|
||||
|
||||
注意:此函数在 FastAPI 接收任何请求之前执行
|
||||
"""
|
||||
global _task_manager
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("服务启动初始化...")
|
||||
logger.info("=" * 60)
|
||||
|
||||
try:
|
||||
# 步骤1: 创建调度任务管理器实例
|
||||
_task_manager = ScheduledTaskManager()
|
||||
logger.info("调度任务管理器已初始化")
|
||||
|
||||
# 步骤2: 启动调度器
|
||||
_task_manager.start()
|
||||
logger.info("任务调度器已启动")
|
||||
|
||||
# 步骤3: 创建 ScheduledTask 实例并添加定时任务
|
||||
scheduled_task = ScheduledTask(task_manager=_task_manager)
|
||||
scheduled_task.add_cron_task(
|
||||
task_id="main-task",
|
||||
task_name="调度任务执行启动",
|
||||
crontab="*/1 * * * *", # 每分钟执行一次
|
||||
task_func=scheduled_method, # 注意:不要加括号
|
||||
args=(), # 位置参数(元组)
|
||||
kwargs={} # 关键字参数(字典)
|
||||
)
|
||||
logger.info("已添加定时任务: main-task (每分钟执行一次)")
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("服务启动初始化完成")
|
||||
logger.info("=" * 60)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"服务启动初始化失败: {e}")
|
||||
# 重新抛出异常,让 FastAPI 知道启动失败
|
||||
raise
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_event():
|
||||
"""
|
||||
FastAPI 服务关闭事件处理函数
|
||||
|
||||
在服务关闭时自动执行,完成以下清理工作:
|
||||
1. 优雅关闭调度器(等待正在执行的任务完成)
|
||||
2. 清理全局变量引用
|
||||
|
||||
注意:此函数在 FastAPI 停止接收新请求后执行
|
||||
"""
|
||||
global _task_manager
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("服务关闭中...")
|
||||
logger.info("=" * 60)
|
||||
|
||||
try:
|
||||
# 检查调度器是否已初始化
|
||||
if _task_manager is not None:
|
||||
# 关闭调度器
|
||||
# shutdown() 会等待正在执行的任务完成,然后终止后台线程
|
||||
_task_manager.shutdown()
|
||||
logger.info("任务调度器已关闭")
|
||||
|
||||
# 清理全局变量引用,释放内存
|
||||
_task_manager = None
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("服务已关闭")
|
||||
logger.info("=" * 60)
|
||||
|
||||
except Exception as e:
|
||||
# 记录错误但不重新抛出,确保服务能正常关闭
|
||||
logger.error(f"服务关闭时发生错误: {e}")
|
||||
|
||||
|
||||
# ==================== 静态文件挂载 ====================
|
||||
try:
|
||||
_ui_dir = Path(__file__).resolve().parents[2] / "frontend" / "dist"
|
||||
if _ui_dir.exists():
|
||||
|
||||
@@ -26,3 +26,4 @@ safetensors
|
||||
scipy
|
||||
opencv-python
|
||||
pymupdf
|
||||
apscheduler
|
||||
|
||||
Reference in New Issue
Block a user