我的需求如下:现有一批任务,使用进程池执行,每个任务执行耗时不一样,任务并发执行期间,需要每隔一段时间监控任务执行进度
直接贴代码:
import multiprocessing
import time
import random
from multiprocessing import Pool, Managerdef worker(task_id, task_status, lock):"""任务执行函数,模拟不同耗时操作"""try:# 记录任务开始状态with lock:task_status[task_id] = "running"# 随机生成任务执行时间(30秒到30分钟)execution_time = random.uniform(1, 30)time.sleep(execution_time)# 更新任务完成状态with lock:task_status[task_id] = "completed"return f"Task {task_id} completed"except Exception as e:with lock:task_status[task_id] = f"failed: {str(e)}"return f"Task {task_id} failed"def monitor_tasks(task_status, total_tasks, lock):"""任务监控函数,每30秒输出状态"""while True:time.sleep(10)with lock:running = [k for k, v in task_status.items() if v == "running"]completed = [k for k, v in task_status.items() if v == "completed"]failed = [k for k, v in task_status.items() if v == "failed"]remaining = total_tasks - len(running) - len(completed)print(f"\n=== 系统状态监控 ===")print(f"正在执行的任务数: {len(running)}")print(f"任务详情: {running}")print(f"剩余任务数: {remaining}")print(f"已完成任务数: {len(completed)}")print(f"失败任务数: {len(failed)}")print("====================")if __name__ == '__main__':# 初始化管理器manager = Manager()lock = manager.Lock()# 创建任务状态字典(任务ID: 状态)task_status = manager.dict()total_tasks = 100# 初始化所有任务状态为"pending"for i in range(total_tasks):task_status[i] = "pending"# 创建进程池with Pool(processes=16) as pool:# 提交所有任务for task_id in range(total_tasks):pool.apply_async(worker, args=(task_id, task_status, lock))# 启动监控线程monitor_process = multiprocessing.Process(target=monitor_tasks,args=(task_status, total_tasks, lock))monitor_process.start()# 等待所有任务完成pool.close()pool.join()# 终止监控进程monitor_process.terminate()monitor_process.join()