Compare commits

...

3 Commits

Author SHA1 Message Date
f16ca4fbdc add 添加基于crontab表达式完成的调度任务组件 2026-02-02 00:02:36 +08:00
ae9f7a681f add:调度任务 2026-02-01 21:25:56 +08:00
8083074874 add:调度任务 2026-02-01 18:42:27 +08:00
5 changed files with 486 additions and 0 deletions

View File

@@ -0,0 +1,205 @@
"""
定时任务基类
提供 crontab 表达式到 APScheduler CronTrigger 的转换功能
以及便捷的任务添加方法
"""
import logging
from typing import Optional
from app.core.ScheduledTaskManager import ScheduledTaskManager
from apscheduler.triggers.cron import CronTrigger
logger = logging.getLogger(__name__)
class ScheduledTask:
"""
定时任务基类
提供将标准 crontab 表达式转换为 APScheduler CronTrigger 对象的功能
Crontab 表达式格式:分 时 日 月 周
例如:
- "0 2 * * *" -> 每天凌晨 2:00 执行
- "*/5 * * * *" -> 每 5 分钟执行一次
- "0 0 * * 0" -> 每周日 00:00 执行
- "0 0 1 * *" -> 每月 1 日 00:00 执行
"""
def __init__(self, task_manager: Optional[ScheduledTaskManager] = None):
"""
初始化定时任务
Args:
task_manager: 可选的调度任务管理器实例,如果不提供则需要外部提供
"""
self.task_manager = task_manager
def add_cron_task(
self,
task_id: str,
task_name: str,
crontab: str,
task_func,
args=None,
kwargs=None
):
"""
通过 crontab 表达式添加定时任务
Args:
task_id: 任务唯一标识符
task_name: 任务名称
crontab: crontab 表达式字符串,格式为 "分 时 日 月 周"
例如: "0 2 * * *" 表示每天凌晨 2 点执行
task_func: 要执行的任务函数
args: 传递给任务函数的位置参数(元组)
kwargs: 传递给任务函数的关键字参数(字典)
Returns:
bool: 添加成功返回 True失败返回 False
示例:
task = ScheduledTask(task_manager)
task.add_cron_task(
task_id="daily_backup",
task_name="每日备份",
crontab="0 2 * * *",
task_func=backup_function,
args=()
)
"""
if self.task_manager is None:
logger.error("任务管理器未初始化,无法添加任务")
return False
try:
# 将 crontab 表达式转换为 CronTrigger
trigger = self.covert(crontab)
if trigger is None:
logger.error(f"crontab 表达式转换失败: {crontab}")
return False
# 使用调度器添加任务
job = self.task_manager.scheduler.add_job(
task_func,
trigger,
id=task_id,
name=task_name,
args=args,
kwargs=kwargs
)
# 记录到 jobs 字典中
self.task_manager.jobs[task_id] = job
logger.info(f"已添加定时任务: {task_name} ({task_id}), crontab: {crontab}")
return True
except Exception as e:
logger.error(f"添加定时任务失败: {e}")
return False
def covert(self, crontab: str) -> Optional[CronTrigger]:
"""
将字符串的 crontab 表达式转换为 CronTrigger 对象
Args:
crontab: crontab 表达式字符串
格式: "分钟 小时 日期 月份 星期"
支持的特殊字符: * (任意值), / (间隔), - (范围), , (列表)
Returns:
Optional[CronTrigger]: CronTrigger 对象,解析失败返回 None
Crontab 字段说明:
字段 取值范围 特殊字符
分钟 0-59 * / - ,
小时 0-23 * / - ,
日期 1-31 * / - ,
月份 1-12 * / - ,
星期 0-7 (0和7都代表周日) * / - ,
示例:
"0 2 * * *" -> 每天凌晨 2:00
"*/5 * * * *" -> 每 5 分钟
"0 0 * * 0" -> 每周日 00:00
"0 0 1 * *" -> 每月 1 日 00:00
"30 8 * * 1-5" -> 工作日早上 8:30
"0 9-17 * * 1-5" -> 工作日每小时 9:00-17:00
"""
if not crontab or not isinstance(crontab, str):
logger.error(f"无效的 crontab 表达式: {crontab}")
return None
try:
# 去除首尾空格并按空格分割
parts = crontab.strip().split()
# 验证格式crontab 表达式应该有 5 或 6 个部分
# 5 部分: 分 时 日 月 周
# 6 部分: 分 时 日 月 周 年 (可选)
if len(parts) < 5 or len(parts) > 6:
logger.error(f"crontab 表达式格式错误,应为 5 或 6 个部分,实际为 {len(parts)} 个: {crontab}")
return None
# 解析各个字段
minute = parts[0] # 分钟: 0-59
hour = parts[1] # 小时: 0-23
day = parts[2] # 日期: 1-31
month = parts[3] # 月份: 1-12
day_of_week = parts[4] # 星期: 0-7 (0 和 7 都代表周日)
# 处理星期字段的特殊值
# 在 crontab 中0 和 7 都代表周日
# 在 APScheduler 中,周日使用 6 (mon=0, tue=1, ..., sun=6)
# 需要进行转换
if day_of_week in ['0', '7']:
day_of_week = '6' # 转换为 APScheduler 的周日表示
# 创建 CronTrigger 对象
trigger = CronTrigger(
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=day_of_week
)
logger.debug(f"成功转换 crontab 表达式: {crontab} -> {trigger}")
return trigger
except ValueError as e:
logger.error(f"crontab 表达式值无效: {crontab}, 错误: {e}")
return None
except Exception as e:
logger.error(f"转换 crontab 表达式失败: {crontab}, 错误: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 创建任务实例
task = ScheduledTask()
# 测试各种 crontab 表达式
test_cases = [
"0 2 * * *", # 每天凌晨 2 点
"*/5 * * * *", # 每 5 分钟
"0 0 * * 0", # 每周日
"0 0 1 * *", # 每月 1 日
"30 8 * * 1-5", # 工作日早上 8:30
"0 9-17 * * 1-5", # 工作日每小时
]
print("Crontab 表达式转换测试:")
print("-" * 60)
for crontab in test_cases:
trigger = task.covert(crontab)
if trigger:
print(f"{crontab:20s} -> {trigger}")
else:
print(f"{crontab:20s} -> 转换失败")

View File

@@ -0,0 +1,157 @@
import logging
import time
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ScheduledTaskManager:
def __init__(self):
"""初始化调度器"""
self.scheduler = BackgroundScheduler()
self.jobs = {}
# 添加任务
def _add_task(self, task_id,task_name, trigger, task_func, args=None, kwargs=None):
job = self.scheduler.add_job(
task_func, trigger, id=task_id,
args=args, kwargs=kwargs
)
self.jobs[task_id] = job
logger.info(f"已添加{task_name}任务 taskId: {task_id}, 执行时间: {trigger}")
def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None):
"""添加间隔任务(备用方案)
Args:
task_id: 任务唯一标识
interval_type: 'seconds', 'minutes', 'hours', 'days'
value: 间隔值
"""
from apscheduler.triggers.interval import IntervalTrigger
if interval_type == 'seconds':
trigger = IntervalTrigger(seconds=value)
elif interval_type == 'minutes':
trigger = IntervalTrigger(minutes=value)
elif interval_type == 'hours':
trigger = IntervalTrigger(hours=value)
elif interval_type == 'days':
trigger = IntervalTrigger(days=value)
else:
raise ValueError(f"不支持的间隔类型: {interval_type}")
job = self.scheduler.add_job(
task_func, trigger, id=task_id,
args=args, kwargs=kwargs
)
self.jobs[task_id] = job
logger.info(f"已添加间隔任务: {task_id}, 间隔: 每{value}{interval_type}")
# 移除任务
def remove_task(self, task_id):
"""移除任务"""
if task_id in self.jobs:
self.scheduler.remove_job(task_id)
del self.jobs[task_id]
logger.info(f"已移除任务: {task_id}")
# 暂停任务
def pause_task(self, task_id):
"""暂停任务"""
if task_id in self.jobs:
self.jobs[task_id].pause()
logger.info(f"已暂停任务: {task_id}")
# 重启任务
def resume_task(self, task_id):
"""恢复任务"""
if task_id in self.jobs:
self.jobs[task_id].resume()
logger.info(f"已恢复任务: {task_id}")
# 获取全部任务
def get_all_tasks(self):
"""获取所有任务信息"""
tasks_info = []
for job_id, job in self.jobs.items():
tasks_info.append({
'id': job_id,
'name': job.name,
'next_run_time': job.next_run_time,
'trigger': str(job.trigger)
})
return tasks_info
#启动任务
def start(self):
"""启动调度器"""
self.scheduler.start()
logger.info("调度器已启动")
# 关闭
def shutdown(self):
"""关闭调度器"""
self.scheduler.shutdown()
logger.info("调度器已关闭")
# 示例任务函数
def sample_task(task_name, *args, **kwargs):
"""示例任务函数"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{now}] 执行任务: {task_name}")
if args:
print(f"位置参数: {args}")
if kwargs:
print(f"关键字参数: {kwargs}")
print("-" * 50)
print()
# 使用示例
if __name__ == "__main__":
# 创建任务管理器
manager = ScheduledTaskManager()
# 启动调度器
manager.start()
try:
# 添加每日任务 (每天10:30执行)
# # 添加每月任务 (每月1日9:00执行)
# manager.add_monthly_task(
# task_id="monthly_summary",
# day=1,
# hour=9, 12354354345321.21135432213245322154373554
# minute=0,
# task_func=sample_task,
# args=("月度汇总",)
# )
#
# # 添加每小时任务 (每小时的第15分钟执行)
# manager.add_hourly_task(
# task_id="hourly_check",
# minute=15,
# task_func=sample_task,
# args=("每小时检查",)
# )
# 查看任务列表
print("当前任务列表:")
for task in manager.get_all_tasks():
print(f" - {task['id']}: 下次执行时间: {task['next_run_time']}")
# 保持程序运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n收到中断信号,正在关闭...")
finally:
manager.shutdown()

View File

@@ -0,0 +1 @@
# Core module for application initialization and configuration

View File

@@ -64,6 +64,12 @@ from app.services.pdf_converter import (
markdown_file_to_pdf_bytes,
read_file_content,
)
from app.core.ScheduledTaskManager import ScheduledTaskManager
from app.core.ScheduledTask import ScheduledTask
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
"""
@api Server Application
@@ -78,6 +84,122 @@ app.add_middleware(
allow_headers=["*"],
)
# ==================== 调度任务管理器 ====================
# 全局调度任务管理器实例,用于管理所有定时任务
# 使用 Optional 类型注解,表示可能为 None服务启动前/关闭后)
_task_manager: Optional[ScheduledTaskManager] = None
def get_task_manager() -> Optional[ScheduledTaskManager]:
"""
获取全局调度任务管理器实例
Returns:
Optional[ScheduledTaskManager]: 调度器实例,如果未初始化则返回 None
使用示例:
task_manager = get_task_manager()
if task_manager:
task_manager.add_daily_task("task_id", 10, 30, some_function)
"""
global _task_manager
return _task_manager
# 定义定时任务函数
def scheduled_method():
"""定时任务执行的函数"""
logger.info("执行定时任务...")
@app.on_event("startup")
async def startup_event():
"""
FastAPI 服务启动事件处理函数
在服务启动时自动执行,完成以下初始化工作:
1. 创建 ScheduledTaskManager 实例
2. 启动后台调度器(基于 APScheduler BackgroundScheduler
3. 添加定时任务
4. 记录初始化日志
注意:此函数在 FastAPI 接收任何请求之前执行
"""
global _task_manager
logger.info("=" * 60)
logger.info("服务启动初始化...")
logger.info("=" * 60)
try:
# 步骤1: 创建调度任务管理器实例
_task_manager = ScheduledTaskManager()
logger.info("调度任务管理器已初始化")
# 步骤2: 启动调度器
_task_manager.start()
logger.info("任务调度器已启动")
# 步骤3: 创建 ScheduledTask 实例并添加定时任务
scheduled_task = ScheduledTask(task_manager=_task_manager)
scheduled_task.add_cron_task(
task_id="main-task",
task_name="调度任务执行启动",
crontab="*/1 * * * *", # 每分钟执行一次
task_func=scheduled_method, # 注意:不要加括号
args=(), # 位置参数(元组)
kwargs={} # 关键字参数(字典)
)
logger.info("已添加定时任务: main-task (每分钟执行一次)")
logger.info("=" * 60)
logger.info("服务启动初始化完成")
logger.info("=" * 60)
except Exception as e:
logger.error(f"服务启动初始化失败: {e}")
# 重新抛出异常,让 FastAPI 知道启动失败
raise
@app.on_event("shutdown")
async def shutdown_event():
"""
FastAPI 服务关闭事件处理函数
在服务关闭时自动执行,完成以下清理工作:
1. 优雅关闭调度器(等待正在执行的任务完成)
2. 清理全局变量引用
注意:此函数在 FastAPI 停止接收新请求后执行
"""
global _task_manager
logger.info("=" * 60)
logger.info("服务关闭中...")
logger.info("=" * 60)
try:
# 检查调度器是否已初始化
if _task_manager is not None:
# 关闭调度器
# shutdown() 会等待正在执行的任务完成,然后终止后台线程
_task_manager.shutdown()
logger.info("任务调度器已关闭")
# 清理全局变量引用,释放内存
_task_manager = None
logger.info("=" * 60)
logger.info("服务已关闭")
logger.info("=" * 60)
except Exception as e:
# 记录错误但不重新抛出,确保服务能正常关闭
logger.error(f"服务关闭时发生错误: {e}")
# ==================== 静态文件挂载 ====================
try:
_ui_dir = Path(__file__).resolve().parents[2] / "frontend" / "dist"
if _ui_dir.exists():

View File

@@ -26,3 +26,4 @@ safetensors
scipy
opencv-python
pymupdf
apscheduler