add 添加基于crontab表达式完成的调度任务组件

This commit is contained in:
2026-02-02 00:02:36 +08:00
parent ae9f7a681f
commit f16ca4fbdc
4 changed files with 335 additions and 398 deletions

View File

@@ -0,0 +1,205 @@
"""
定时任务基类
提供 crontab 表达式到 APScheduler CronTrigger 的转换功能
以及便捷的任务添加方法
"""
import logging
from typing import Optional
from app.core.ScheduledTaskManager import ScheduledTaskManager
from apscheduler.triggers.cron import CronTrigger
logger = logging.getLogger(__name__)
class ScheduledTask:
"""
定时任务基类
提供将标准 crontab 表达式转换为 APScheduler CronTrigger 对象的功能
Crontab 表达式格式:分 时 日 月 周
例如:
- "0 2 * * *" -> 每天凌晨 2:00 执行
- "*/5 * * * *" -> 每 5 分钟执行一次
- "0 0 * * 0" -> 每周日 00:00 执行
- "0 0 1 * *" -> 每月 1 日 00:00 执行
"""
def __init__(self, task_manager: Optional[ScheduledTaskManager] = None):
"""
初始化定时任务
Args:
task_manager: 可选的调度任务管理器实例,如果不提供则需要外部提供
"""
self.task_manager = task_manager
def add_cron_task(
self,
task_id: str,
task_name: str,
crontab: str,
task_func,
args=None,
kwargs=None
):
"""
通过 crontab 表达式添加定时任务
Args:
task_id: 任务唯一标识符
task_name: 任务名称
crontab: crontab 表达式字符串,格式为 "分 时 日 月 周"
例如: "0 2 * * *" 表示每天凌晨 2 点执行
task_func: 要执行的任务函数
args: 传递给任务函数的位置参数(元组)
kwargs: 传递给任务函数的关键字参数(字典)
Returns:
bool: 添加成功返回 True失败返回 False
示例:
task = ScheduledTask(task_manager)
task.add_cron_task(
task_id="daily_backup",
task_name="每日备份",
crontab="0 2 * * *",
task_func=backup_function,
args=()
)
"""
if self.task_manager is None:
logger.error("任务管理器未初始化,无法添加任务")
return False
try:
# 将 crontab 表达式转换为 CronTrigger
trigger = self.covert(crontab)
if trigger is None:
logger.error(f"crontab 表达式转换失败: {crontab}")
return False
# 使用调度器添加任务
job = self.task_manager.scheduler.add_job(
task_func,
trigger,
id=task_id,
name=task_name,
args=args,
kwargs=kwargs
)
# 记录到 jobs 字典中
self.task_manager.jobs[task_id] = job
logger.info(f"已添加定时任务: {task_name} ({task_id}), crontab: {crontab}")
return True
except Exception as e:
logger.error(f"添加定时任务失败: {e}")
return False
def covert(self, crontab: str) -> Optional[CronTrigger]:
"""
将字符串的 crontab 表达式转换为 CronTrigger 对象
Args:
crontab: crontab 表达式字符串
格式: "分钟 小时 日期 月份 星期"
支持的特殊字符: * (任意值), / (间隔), - (范围), , (列表)
Returns:
Optional[CronTrigger]: CronTrigger 对象,解析失败返回 None
Crontab 字段说明:
字段 取值范围 特殊字符
分钟 0-59 * / - ,
小时 0-23 * / - ,
日期 1-31 * / - ,
月份 1-12 * / - ,
星期 0-7 (0和7都代表周日) * / - ,
示例:
"0 2 * * *" -> 每天凌晨 2:00
"*/5 * * * *" -> 每 5 分钟
"0 0 * * 0" -> 每周日 00:00
"0 0 1 * *" -> 每月 1 日 00:00
"30 8 * * 1-5" -> 工作日早上 8:30
"0 9-17 * * 1-5" -> 工作日每小时 9:00-17:00
"""
if not crontab or not isinstance(crontab, str):
logger.error(f"无效的 crontab 表达式: {crontab}")
return None
try:
# 去除首尾空格并按空格分割
parts = crontab.strip().split()
# 验证格式crontab 表达式应该有 5 或 6 个部分
# 5 部分: 分 时 日 月 周
# 6 部分: 分 时 日 月 周 年 (可选)
if len(parts) < 5 or len(parts) > 6:
logger.error(f"crontab 表达式格式错误,应为 5 或 6 个部分,实际为 {len(parts)} 个: {crontab}")
return None
# 解析各个字段
minute = parts[0] # 分钟: 0-59
hour = parts[1] # 小时: 0-23
day = parts[2] # 日期: 1-31
month = parts[3] # 月份: 1-12
day_of_week = parts[4] # 星期: 0-7 (0 和 7 都代表周日)
# 处理星期字段的特殊值
# 在 crontab 中0 和 7 都代表周日
# 在 APScheduler 中,周日使用 6 (mon=0, tue=1, ..., sun=6)
# 需要进行转换
if day_of_week in ['0', '7']:
day_of_week = '6' # 转换为 APScheduler 的周日表示
# 创建 CronTrigger 对象
trigger = CronTrigger(
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=day_of_week
)
logger.debug(f"成功转换 crontab 表达式: {crontab} -> {trigger}")
return trigger
except ValueError as e:
logger.error(f"crontab 表达式值无效: {crontab}, 错误: {e}")
return None
except Exception as e:
logger.error(f"转换 crontab 表达式失败: {crontab}, 错误: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 创建任务实例
task = ScheduledTask()
# 测试各种 crontab 表达式
test_cases = [
"0 2 * * *", # 每天凌晨 2 点
"*/5 * * * *", # 每 5 分钟
"0 0 * * 0", # 每周日
"0 0 1 * *", # 每月 1 日
"30 8 * * 1-5", # 工作日早上 8:30
"0 9-17 * * 1-5", # 工作日每小时
]
print("Crontab 表达式转换测试:")
print("-" * 60)
for crontab in test_cases:
trigger = task.covert(crontab)
if trigger:
print(f"{crontab:20s} -> {trigger}")
else:
print(f"{crontab:20s} -> 转换失败")

View File

@@ -0,0 +1,157 @@
import logging
import time
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ScheduledTaskManager:
def __init__(self):
"""初始化调度器"""
self.scheduler = BackgroundScheduler()
self.jobs = {}
# 添加任务
def _add_task(self, task_id,task_name, trigger, task_func, args=None, kwargs=None):
job = self.scheduler.add_job(
task_func, trigger, id=task_id,
args=args, kwargs=kwargs
)
self.jobs[task_id] = job
logger.info(f"已添加{task_name}任务 taskId: {task_id}, 执行时间: {trigger}")
def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None):
"""添加间隔任务(备用方案)
Args:
task_id: 任务唯一标识
interval_type: 'seconds', 'minutes', 'hours', 'days'
value: 间隔值
"""
from apscheduler.triggers.interval import IntervalTrigger
if interval_type == 'seconds':
trigger = IntervalTrigger(seconds=value)
elif interval_type == 'minutes':
trigger = IntervalTrigger(minutes=value)
elif interval_type == 'hours':
trigger = IntervalTrigger(hours=value)
elif interval_type == 'days':
trigger = IntervalTrigger(days=value)
else:
raise ValueError(f"不支持的间隔类型: {interval_type}")
job = self.scheduler.add_job(
task_func, trigger, id=task_id,
args=args, kwargs=kwargs
)
self.jobs[task_id] = job
logger.info(f"已添加间隔任务: {task_id}, 间隔: 每{value}{interval_type}")
# 移除任务
def remove_task(self, task_id):
"""移除任务"""
if task_id in self.jobs:
self.scheduler.remove_job(task_id)
del self.jobs[task_id]
logger.info(f"已移除任务: {task_id}")
# 暂停任务
def pause_task(self, task_id):
"""暂停任务"""
if task_id in self.jobs:
self.jobs[task_id].pause()
logger.info(f"已暂停任务: {task_id}")
# 重启任务
def resume_task(self, task_id):
"""恢复任务"""
if task_id in self.jobs:
self.jobs[task_id].resume()
logger.info(f"已恢复任务: {task_id}")
# 获取全部任务
def get_all_tasks(self):
"""获取所有任务信息"""
tasks_info = []
for job_id, job in self.jobs.items():
tasks_info.append({
'id': job_id,
'name': job.name,
'next_run_time': job.next_run_time,
'trigger': str(job.trigger)
})
return tasks_info
#启动任务
def start(self):
"""启动调度器"""
self.scheduler.start()
logger.info("调度器已启动")
# 关闭
def shutdown(self):
"""关闭调度器"""
self.scheduler.shutdown()
logger.info("调度器已关闭")
# 示例任务函数
def sample_task(task_name, *args, **kwargs):
"""示例任务函数"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{now}] 执行任务: {task_name}")
if args:
print(f"位置参数: {args}")
if kwargs:
print(f"关键字参数: {kwargs}")
print("-" * 50)
print()
# 使用示例
if __name__ == "__main__":
# 创建任务管理器
manager = ScheduledTaskManager()
# 启动调度器
manager.start()
try:
# 添加每日任务 (每天10:30执行)
# # 添加每月任务 (每月1日9:00执行)
# manager.add_monthly_task(
# task_id="monthly_summary",
# day=1,
# hour=9, 12354354345321.21135432213245322154373554
# minute=0,
# task_func=sample_task,
# args=("月度汇总",)
# )
#
# # 添加每小时任务 (每小时的第15分钟执行)
# manager.add_hourly_task(
# task_id="hourly_check",
# minute=15,
# task_func=sample_task,
# args=("每小时检查",)
# )
# 查看任务列表
print("当前任务列表:")
for task in manager.get_all_tasks():
print(f" - {task['id']}: 下次执行时间: {task['next_run_time']}")
# 保持程序运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n收到中断信号,正在关闭...")
finally:
manager.shutdown()

View File

@@ -1,329 +0,0 @@
"""
服务启动自动初始化模块
提供统一的服务启动初始化框架,支持:
1. 顺序初始化:按依赖顺序执行初始化任务
2. 并发初始化:无依赖的任务可并发执行
3. 失败重试:初始化失败可配置重试策略
4. 健康检查:初始化完成后进行健康检查
"""
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import List, Optional, Callable, Any, Dict
from dataclasses import dataclass, field
from enum import Enum
import traceback
logger = logging.getLogger(__name__)
class InitStatus(Enum):
"""初始化状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class InitResult:
"""初始化结果"""
name: str
status: InitStatus
message: str = ""
error: Optional[Exception] = None
duration_ms: float = 0.0
def __str__(self):
status_emoji = {
InitStatus.SUCCESS: "",
InitStatus.FAILED: "",
InitStatus.SKIPPED: "",
InitStatus.RUNNING: "",
}.get(self.status, "?")
return f"{status_emoji} {self.name}: {self.message}"
@dataclass
class InitTask:
"""初始化任务配置"""
name: str
func: Callable
dependencies: List[str] = field(default_factory=list)
enabled: bool = True
retry_times: int = 0
retry_delay: float = 1.0
critical: bool = True # 失败是否阻止服务启动
def __post_init__(self):
if not self.name:
raise ValueError("Task name cannot be empty")
class AppInitializer:
"""
应用初始化管理器
使用示例:
initializer = AppInitializer()
# 添加初始化任务
@initializer.task("init_database")
async def init_database():
# 数据库初始化逻辑
pass
@initializer.task("init_cache", dependencies=["init_database"])
async def init_cache():
# 缓存初始化逻辑(依赖数据库先初始化)
pass
# 执行初始化
results = await initializer.initialize()
"""
def __init__(self, app_name: str = "FastAPI App"):
self.app_name = app_name
self.tasks: Dict[str, InitTask] = {}
self.results: List[InitResult] = []
self._before_hooks: List[Callable] = []
self._after_hooks: List[Callable] = []
def task(
self,
name: str,
dependencies: Optional[List[str]] = None,
enabled: bool = True,
retry_times: int = 0,
retry_delay: float = 1.0,
critical: bool = True
) -> Callable:
"""
装饰器:注册初始化任务
Args:
name: 任务名称
dependencies: 依赖的任务名称列表
enabled: 是否启用
retry_times: 失败重试次数
retry_delay: 重试延迟(秒)
critical: 是否为关键任务(失败则阻止启动)
"""
def decorator(func: Callable) -> Callable:
self.tasks[name] = InitTask(
name=name,
func=func,
dependencies=dependencies or [],
enabled=enabled,
retry_times=retry_times,
retry_delay=retry_delay,
critical=critical
)
return func
return decorator
def add_task(
self,
name: str,
func: Callable,
dependencies: Optional[List[str]] = None,
enabled: bool = True,
retry_times: int = 0,
retry_delay: float = 1.0,
critical: bool = True
) -> None:
"""手动添加初始化任务"""
self.tasks[name] = InitTask(
name=name,
func=func,
dependencies=dependencies or [],
enabled=enabled,
retry_times=retry_times,
retry_delay=retry_delay,
critical=critical
)
def before_start(self, func: Callable) -> Callable:
"""装饰器:添加初始化前执行的钩子"""
self._before_hooks.append(func)
return func
def after_start(self, func: Callable) -> Callable:
"""装饰器:添加初始化后执行的钩子"""
self._after_hooks.append(func)
return func
async def initialize(self) -> List[InitResult]:
"""
执行所有初始化任务
Returns:
List[InitResult]: 初始化结果列表
"""
import time
start_time = time.time()
logger.info(f"{'='*60}")
logger.info(f"开始初始化 {self.app_name}")
logger.info(f"{'='*60}")
# 执行前置钩子
for hook in self._before_hooks:
try:
if asyncio.iscoroutinefunction(hook):
await hook()
else:
hook()
except Exception as e:
logger.error(f"前置钩子执行失败: {e}")
self.results = []
executed = set()
failed_critical = False
# 按拓扑顺序执行任务
while len(executed) < len(self.tasks):
# 找出所有可以执行的任务(依赖已满足或未启用)
ready_tasks = [
task for name, task in self.tasks.items()
if name not in executed and task.enabled and
all(dep in executed or dep not in self.tasks for dep in task.dependencies)
]
if not ready_tasks:
# 检查是否有未执行的任务
remaining = [name for name in self.tasks if name not in executed and self.tasks[name].enabled]
if remaining:
logger.error(f"存在循环依赖或未满足的依赖: {remaining}")
break
# 并发执行所有就绪的任务
results = await asyncio.gather(
*[self._execute_task(task) for task in ready_tasks],
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
logger.error(f"任务执行异常: {result}")
continue
self.results.append(result)
executed.add(result.name)
if result.status == InitStatus.FAILED and self.tasks[result.name].critical:
failed_critical = True
logger.error(f"关键任务 {result.name} 初始化失败,停止后续初始化")
# 执行后置钩子
if not failed_critical:
for hook in self._after_hooks:
try:
if asyncio.iscoroutinefunction(hook):
await hook()
else:
hook()
except Exception as e:
logger.error(f"后置钩子执行失败: {e}")
# 打印摘要
duration = time.time() - start_time
self._print_summary(duration, failed_critical)
return self.results
async def _execute_task(self, task: InitTask) -> InitResult:
"""执行单个初始化任务"""
import time
start_time = time.time()
result = InitResult(name=task.name, status=InitStatus.PENDING)
for attempt in range(task.retry_times + 1):
try:
result.status = InitStatus.RUNNING
logger.info(f"正在执行: {task.name}" + (f" (重试 {attempt}/{task.retry_times})" if attempt > 0 else ""))
# 执行任务
if asyncio.iscoroutinefunction(task.func):
await task.func()
else:
task.func()
result.status = InitStatus.SUCCESS
result.message = "初始化成功"
break
except Exception as e:
if attempt < task.retry_times:
logger.warning(f"{task.name} 失败,{task.retry_delay}秒后重试: {e}")
await asyncio.sleep(task.retry_delay)
else:
result.status = InitStatus.FAILED
result.message = f"初始化失败: {str(e)}"
result.error = e
logger.error(f"{task.name} 失败: {e}\n{traceback.format_exc()}")
result.duration_ms = (time.time() - start_time) * 1000
return result
def _print_summary(self, duration: float, failed_critical: bool) -> None:
"""打印初始化摘要"""
logger.info(f"{'='*60}")
logger.info(f"初始化完成 (耗时: {duration:.2f}秒)")
logger.info(f"{'='*60}")
# 统计
success_count = sum(1 for r in self.results if r.status == InitStatus.SUCCESS)
failed_count = sum(1 for r in self.results if r.status == InitStatus.FAILED)
skipped_count = sum(1 for r in self.results if r.status == InitStatus.SKIPPED)
logger.info(f"总计: {len(self.results)} 个任务")
logger.info(f" 成功: {success_count}")
logger.info(f" 失败: {failed_count}")
logger.info(f" 跳过: {skipped_count}")
# 详细结果
if self.results:
logger.info(f"\n详细结果:")
for result in self.results:
logger.info(f" {result}")
if failed_critical:
logger.error(f"{'='*60}")
logger.error(f"关键任务初始化失败,服务可能无法正常工作!")
logger.error(f"{'='*60}")
def get_results(self) -> List[InitResult]:
"""获取初始化结果"""
return self.results
def is_successful(self) -> bool:
"""检查是否所有关键任务都初始化成功"""
for result in self.results:
if result.status == InitStatus.FAILED:
task = self.tasks.get(result.name)
if task and task.critical:
return False
return True
# 全局初始化器实例
_global_initializer: Optional[AppInitializer] = None
def get_initializer() -> AppInitializer:
"""获取全局初始化器实例"""
global _global_initializer
if _global_initializer is None:
_global_initializer = AppInitializer()
return _global_initializer
def set_initializer(initializer: AppInitializer) -> None:
"""设置全局初始化器实例"""
global _global_initializer
_global_initializer = initializer