from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime import time import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ScheduledTaskManager: def __init__(self): """初始化调度器""" self.scheduler = BackgroundScheduler() self.jobs = {} # 添加“日”周期任务 def add_second_task(self, task_id,second, task_func, args=None, kwargs=None): trigger = CronTrigger(second=second) job = self.scheduler.add_job( task_func, trigger, id=task_id, args=args, kwargs=kwargs ) self.jobs[task_id] = job logger.info(f"已添加每秒任务: {task_id}, 执行时间: {second:02d}") # 添加“日”周期任务 def add_daily_task(self, task_id, hour, minute, task_func, args=None, kwargs=None): """添加每日定时任务 Args: task_id: 任务唯一标识 hour: 小时 (0-23) minute: 分钟 (0-59) task_func: 任务函数 args: 位置参数 kwargs: 关键字参数 """ trigger = CronTrigger(hour=hour, minute=minute) job = self.scheduler.add_job( task_func, trigger, id=task_id, args=args, kwargs=kwargs ) self.jobs[task_id] = job logger.info(f"已添加每日任务: {task_id}, 执行时间: {hour:02d}:{minute:02d}") # 添加“月”周期任务 def add_monthly_task(self, task_id, day, hour, minute, task_func, args=None, kwargs=None): """添加每月定时任务 Args: task_id: 任务唯一标识 day: 日 (1-31) hour: 小时 (0-23) minute: 分钟 (0-59) """ trigger = CronTrigger(day=day, hour=hour, minute=minute) job = self.scheduler.add_job( task_func, trigger, id=task_id, args=args, kwargs=kwargs ) self.jobs[task_id] = job logger.info(f"已添加每月任务: {task_id}, 执行时间: 每月{day}日 {hour:02d}:{minute:02d}") def add_hourly_task(self, task_id, minute, task_func, args=None, kwargs=None): """添加每小时定时任务 Args: task_id: 任务唯一标识 minute: 分钟 (0-59) """ trigger = CronTrigger(minute=minute) job = self.scheduler.add_job( task_func, trigger, id=task_id, args=args, kwargs=kwargs ) self.jobs[task_id] = job logger.info(f"已添加每小时任务: {task_id}, 执行时间: 每小时的{minute:02d}分") def add_interval_task(self, task_id, interval_type, value, task_func, args=None, kwargs=None): """添加间隔任务(备用方案) Args: task_id: 任务唯一标识 interval_type: 'seconds', 'minutes', 'hours', 'days' value: 间隔值 """ from apscheduler.triggers.interval import IntervalTrigger if interval_type == 'seconds': trigger = IntervalTrigger(seconds=value) elif interval_type == 'minutes': trigger = IntervalTrigger(minutes=value) elif interval_type == 'hours': trigger = IntervalTrigger(hours=value) elif interval_type == 'days': trigger = IntervalTrigger(days=value) else: raise ValueError(f"不支持的间隔类型: {interval_type}") job = self.scheduler.add_job( task_func, trigger, id=task_id, args=args, kwargs=kwargs ) self.jobs[task_id] = job logger.info(f"已添加间隔任务: {task_id}, 间隔: 每{value}{interval_type}") # 移除任务 def remove_task(self, task_id): """移除任务""" if task_id in self.jobs: self.scheduler.remove_job(task_id) del self.jobs[task_id] logger.info(f"已移除任务: {task_id}") # 暂停任务 def pause_task(self, task_id): """暂停任务""" if task_id in self.jobs: self.jobs[task_id].pause() logger.info(f"已暂停任务: {task_id}") # 重启任务 def resume_task(self, task_id): """恢复任务""" if task_id in self.jobs: self.jobs[task_id].resume() logger.info(f"已恢复任务: {task_id}") # 获取全部任务 def get_all_tasks(self): """获取所有任务信息""" tasks_info = [] for job_id, job in self.jobs.items(): tasks_info.append({ 'id': job_id, 'name': job.name, 'next_run_time': job.next_run_time, 'trigger': str(job.trigger) }) return tasks_info #启动任务 def start(self): """启动调度器""" self.scheduler.start() logger.info("调度器已启动") # 关闭 def shutdown(self): """关闭调度器""" self.scheduler.shutdown() logger.info("调度器已关闭") # 示例任务函数 def sample_task(task_name, *args, **kwargs): """示例任务函数""" now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{now}] 执行任务: {task_name}") if args: print(f"位置参数: {args}") if kwargs: print(f"关键字参数: {kwargs}") print("-" * 50) print() # 使用示例 if __name__ == "__main__": # 创建任务管理器 manager = ScheduledTaskManager() # 启动调度器 manager.start() try: # 添加每日任务 (每天10:30执行) manager.add_second_task( task_id="daily_report", second=10, task_func=sample_task, args=("每日报告",), kwargs={"type": "second"} ) # # # 添加每月任务 (每月1日9:00执行) # manager.add_monthly_task( # task_id="monthly_summary", # day=1, # hour=9, 12354354345321.21135432213245322154373554 # minute=0, # task_func=sample_task, # args=("月度汇总",) # ) # # # 添加每小时任务 (每小时的第15分钟执行) # manager.add_hourly_task( # task_id="hourly_check", # minute=15, # task_func=sample_task, # args=("每小时检查",) # ) # 查看任务列表 print("当前任务列表:") for task in manager.get_all_tasks(): print(f" - {task['id']}: 下次执行时间: {task['next_run_time']}") # 保持程序运行 while True: time.sleep(1) except KeyboardInterrupt: print("\n收到中断信号,正在关闭...") finally: manager.shutdown()